blob: 95216b6f90f8ea1ff9c5ca023fd3714f354b1f23 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.epam.datalab.backendapi.dao;
import com.epam.datalab.backendapi.util.DateRemoverUtil;
import com.epam.datalab.dto.ResourceURL;
import com.epam.datalab.dto.SchedulerJobDTO;
import com.epam.datalab.dto.StatusEnvBaseDTO;
import com.epam.datalab.dto.UserInstanceDTO;
import com.epam.datalab.dto.UserInstanceStatus;
import com.epam.datalab.dto.aws.computational.ClusterConfig;
import com.epam.datalab.dto.base.DataEngineType;
import com.epam.datalab.dto.computational.ComputationalStatusDTO;
import com.epam.datalab.dto.computational.UserComputationalResource;
import com.epam.datalab.exceptions.DatalabException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.mongodb.client.model.Filters;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import static com.epam.datalab.backendapi.dao.ExploratoryDAO.COMPUTATIONAL_RESOURCES;
import static com.epam.datalab.backendapi.dao.ExploratoryDAO.UPTIME;
import static com.epam.datalab.backendapi.dao.ExploratoryDAO.exploratoryCondition;
import static com.epam.datalab.backendapi.dao.MongoCollections.USER_INSTANCES;
import static com.epam.datalab.backendapi.dao.SchedulerJobDAO.SCHEDULER_DATA;
import static com.epam.datalab.dto.UserInstanceStatus.TERMINATED;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Filters.in;
import static com.mongodb.client.model.Filters.ne;
import static com.mongodb.client.model.Filters.not;
import static com.mongodb.client.model.Projections.elemMatch;
import static com.mongodb.client.model.Projections.excludeId;
import static com.mongodb.client.model.Projections.fields;
import static com.mongodb.client.model.Projections.include;
import static com.mongodb.client.model.Updates.push;
import static com.mongodb.client.model.Updates.set;
import static java.util.stream.Collectors.toList;
/**
* DAO for user computational resources.
*/
@Slf4j
public class ComputationalDAO extends BaseDAO {
static final String COMPUTATIONAL_NAME = "computational_name";
static final String COMPUTATIONAL_ID = "computational_id";
static final String PROJECT = "project";
static final String ENDPOINT = "endpoint";
static final String IMAGE = "image";
private static final String COMPUTATIONAL_URL = "computational_url";
private static final String EXPLORATORY_NAME = "exploratory_name";
private static final String COMPUTATIONAL_URL_DESC = "description";
private static final String COMPUTATIONAL_URL_URL = "url";
private static final String COMPUTATIONAL_LAST_ACTIVITY = "last_activity";
private static final String CONFIG = "config";
private static String computationalFieldFilter(String fieldName) {
return COMPUTATIONAL_RESOURCES + FIELD_SET_DELIMETER + fieldName;
}
private static Bson computationalCondition(String user, String project, String exploratoryName, String compName) {
return and(eq(USER, user), eq(PROJECT, project), eq(EXPLORATORY_NAME, exploratoryName),
eq(COMPUTATIONAL_RESOURCES + "." + COMPUTATIONAL_NAME, compName));
}
/**
* Add the user's computational resource for notebook into database.
*
* @param user user name.
* @param exploratoryName name of exploratory.
* @param project name of project
* @param computationalDTO object of computational resource.
* @return <b>true</b> if operation was successful, otherwise <b>false</b>.
*/
public boolean addComputational(String user, String exploratoryName, String project,
UserComputationalResource computationalDTO) {
final UpdateResult updateResult = updateOne(USER_INSTANCES,
and(exploratoryCondition(user, exploratoryName, project),
not(elemMatch(COMPUTATIONAL_RESOURCES,
eq(COMPUTATIONAL_NAME, computationalDTO.getComputationalName())))),
push(COMPUTATIONAL_RESOURCES, convertToBson(computationalDTO)));
return updateResult.getModifiedCount() > 0;
}
/**
* Finds and returns the of computational resource.
*
* @param user user name.
* @param project project name
* @param exploratoryName the name of exploratory.
* @param computationalName name of computational resource.
* @throws DatalabException if exception occurs
*/
public UserComputationalResource fetchComputationalFields(String user, String project, String exploratoryName,
String computationalName) {
Optional<UserInstanceDTO> opt = findOne(USER_INSTANCES,
and(exploratoryCondition(user, exploratoryName, project),
Filters.elemMatch(COMPUTATIONAL_RESOURCES, eq(COMPUTATIONAL_NAME, computationalName))),
fields(include(COMPUTATIONAL_RESOURCES + ".$"), excludeId()),
UserInstanceDTO.class);
return opt.map(UserInstanceDTO::getResources)
.filter(l -> !l.isEmpty())
.map(l -> l.get(0))
.orElseThrow(() -> new DatalabException("Computational resource " + computationalName + " for user " + user + " with " +
"exploratory name " + exploratoryName + " not found."));
}
public List<UserComputationalResource> findComputationalResourcesWithStatus(String user, String project, String exploratoryName,
UserInstanceStatus status) {
final UserInstanceDTO userInstanceDTO = findOne(USER_INSTANCES,
and(exploratoryCondition(user, exploratoryName, project),
elemMatch(COMPUTATIONAL_RESOURCES, eq(STATUS, status.toString()))),
fields(include(COMPUTATIONAL_RESOURCES), excludeId()),
UserInstanceDTO.class)
.orElseThrow(() -> new DatalabException(String.format("Computational resource with status %s for user " +
"%s with exploratory name %s not found.", status, user, exploratoryName)));
return userInstanceDTO.getResources()
.stream()
.filter(computationalResource -> computationalResource.getStatus().equals(status.toString()))
.collect(toList());
}
/**
* Updates the status of computational resource in Mongo database.
*
* @param dto object of computational resource status.
* @return The result of an update operation.
*/
public UpdateResult updateComputationalStatus(ComputationalStatusDTO dto) {
try {
Document values = new Document(computationalFieldFilter(STATUS), dto.getStatus());
return updateOne(USER_INSTANCES,
and(exploratoryCondition(dto.getUser(), dto.getExploratoryName(), dto.getProject()),
elemMatch(COMPUTATIONAL_RESOURCES,
and(eq(COMPUTATIONAL_NAME, dto.getComputationalName()),
not(eq(STATUS, TERMINATED.toString()))))),
new Document(SET, values));
} catch (Exception t) {
throw new DatalabException("Could not update computational resource status", t);
}
}
/**
* Updates the status of exploratory notebooks in Mongo database.
*
* @param dto object of exploratory status info.
* @return The result of an update operation.
*/
public int updateComputationalStatusesForExploratory(StatusEnvBaseDTO<?> dto) {
Document values = new Document(computationalFieldFilter(STATUS), dto.getStatus());
values.append(computationalFieldFilter(UPTIME), null);
int count = 0;
UpdateResult result;
do {
result = updateOne(USER_INSTANCES,
and(exploratoryCondition(dto.getUser(), dto.getExploratoryName(), dto.getProject()),
elemMatch(COMPUTATIONAL_RESOURCES,
and(not(eq(STATUS, TERMINATED.toString())),
not(eq(STATUS, dto.getStatus()))))),
new Document(SET, values));
count += result.getModifiedCount();
}
while (result.getModifiedCount() > 0);
return count;
}
public void updateComputationalStatusesForExploratory(String user, String project, String exploratoryName,
UserInstanceStatus dataengineStatus,
UserInstanceStatus dataengineServiceStatus,
UserInstanceStatus... excludedStatuses) {
updateComputationalResource(user, project, exploratoryName, dataengineStatus,
DataEngineType.SPARK_STANDALONE, excludedStatuses);
updateComputationalResource(user, project, exploratoryName, dataengineServiceStatus,
DataEngineType.CLOUD_SERVICE, excludedStatuses);
}
/**
* Updates the status for single computational resource in Mongo database.
*
* @param user user name.
* @param project project name
* @param exploratoryName exploratory's name.
* @param computationalName name of computational resource.
* @param newStatus new status of computational resource.
*/
public void updateStatusForComputationalResource(String user, String project, String exploratoryName,
String computationalName, UserInstanceStatus newStatus) {
updateComputationalField(user, project, exploratoryName, computationalName, STATUS, newStatus.toString());
}
private void updateComputationalResource(String user, String project, String exploratoryName,
UserInstanceStatus dataengineServiceStatus, DataEngineType cloudService,
UserInstanceStatus... excludedStatuses) {
UpdateResult result;
do {
result = updateMany(USER_INSTANCES,
computationalFilter(user, project, exploratoryName,
dataengineServiceStatus.toString(), DataEngineType.getDockerImageName(cloudService), excludedStatuses),
new Document(SET,
new Document(computationalFieldFilter(STATUS), dataengineServiceStatus.toString())));
} while (result.getModifiedCount() > 0);
}
private Bson computationalFilter(String user, String project, String exploratoryName, String computationalStatus,
String computationalImage, UserInstanceStatus[] excludedStatuses) {
final String[] statuses = Arrays.stream(excludedStatuses)
.map(UserInstanceStatus::toString)
.toArray(String[]::new);
return and(exploratoryCondition(user, exploratoryName, project),
elemMatch(COMPUTATIONAL_RESOURCES, and(eq(IMAGE, computationalImage),
not(in(STATUS, statuses)),
not(eq(STATUS, computationalStatus)))));
}
/**
* Updates the info of computational resource in Mongo database.
*
* @param dto object of computational resource status.
* @return The result of an update operation.
* @throws DatalabException if exception occurs
*/
public UpdateResult updateComputationalFields(ComputationalStatusDTO dto) {
try {
Document values = new Document(computationalFieldFilter(STATUS), dto.getStatus());
if (dto.getUptime() != null) {
values.append(computationalFieldFilter(UPTIME), dto.getUptime());
}
if (dto.getInstanceId() != null) {
values.append(computationalFieldFilter(INSTANCE_ID), dto.getInstanceId());
}
if (null != dto.getErrorMessage()) {
values.append(computationalFieldFilter(ERROR_MESSAGE),
DateRemoverUtil.removeDateFormErrorMessage(dto.getErrorMessage()));
}
if (dto.getComputationalId() != null) {
values.append(computationalFieldFilter(COMPUTATIONAL_ID), dto.getComputationalId());
}
if (dto.getResourceUrl() != null && !dto.getResourceUrl().isEmpty()) {
values.append(computationalFieldFilter(COMPUTATIONAL_URL), getResourceUrlData(dto));
}
if (dto.getLastActivity() != null) {
values.append(computationalFieldFilter(COMPUTATIONAL_LAST_ACTIVITY), dto.getLastActivity());
}
if (dto.getConfig() != null) {
values.append(computationalFieldFilter(CONFIG),
dto.getConfig().stream().map(this::convertToBson).collect(toList()));
}
return updateOne(USER_INSTANCES, and(exploratoryCondition(dto.getUser(), dto.getExploratoryName(), dto.getProject()),
elemMatch(COMPUTATIONAL_RESOURCES,
and(eq(COMPUTATIONAL_NAME, dto.getComputationalName()),
not(eq(STATUS, TERMINATED.toString()))))),
new Document(SET, values));
} catch (Exception t) {
throw new DatalabException("Could not update computational resource status", t);
}
}
private List<Map<String, String>> getResourceUrlData(ComputationalStatusDTO dto) {
return dto.getResourceUrl().stream()
.map(this::toUrlDocument)
.collect(toList());
}
private LinkedHashMap<String, String> toUrlDocument(ResourceURL url) {
LinkedHashMap<String, String> map = new LinkedHashMap<>();
map.put(COMPUTATIONAL_URL_DESC, url.getDescription());
map.put(COMPUTATIONAL_URL_URL, url.getUrl());
return map;
}
/**
* Updates the requirement for reuploading key for single computational resource in Mongo database.
*
* @param user user name.
* @param project project name
* @param exploratoryName exploratory's name.
* @param computationalName name of computational resource.
* @param reuploadKeyRequired true/false.
*/
public void updateReuploadKeyFlagForComputationalResource(String user, String project, String exploratoryName,
String computationalName, boolean reuploadKeyRequired) {
updateComputationalField(user, project, exploratoryName, computationalName, REUPLOAD_KEY_REQUIRED, reuploadKeyRequired);
}
/**
* Returns names of computational resources which status is among existing ones. Also these resources will
* have predefined type.
*
* @param user user name.
* @param project project name
* @param computationalTypes type list of computational resource which may contain 'dataengine' and/or
* 'dataengine-service'.
* @param exploratoryName name of exploratory.
* @param computationalStatuses statuses of computational resource.
* @return list of computational resources' names
*/
@SuppressWarnings("unchecked")
public List<String> getComputationalResourcesWhereStatusIn(String user, String project,
List<DataEngineType> computationalTypes,
String exploratoryName,
UserInstanceStatus... computationalStatuses) {
return stream((List<Document>) find(USER_INSTANCES, exploratoryCondition(user, exploratoryName, project),
fields(include(COMPUTATIONAL_RESOURCES))).first().get(COMPUTATIONAL_RESOURCES))
.filter(doc ->
statusList(computationalStatuses).contains(doc.getString(STATUS)) &&
computationalTypes.contains(DataEngineType.fromDockerImageName(doc.getString(IMAGE))))
.map(doc -> doc.getString(COMPUTATIONAL_NAME)).collect(toList());
}
@SuppressWarnings("unchecked")
public List<ClusterConfig> getClusterConfig(String user, String project, String exploratoryName, String computationalName) {
return findOne(USER_INSTANCES,
and(exploratoryCondition(user, exploratoryName, project),
Filters.elemMatch(COMPUTATIONAL_RESOURCES, and(eq(COMPUTATIONAL_NAME, computationalName),
notNull(CONFIG)))),
fields(include(COMPUTATIONAL_RESOURCES + ".$"), excludeId())
).map(d -> ((List<Document>) d.get(COMPUTATIONAL_RESOURCES)).get(0))
.map(d -> convertFromDocument((List<Document>) d.get(CONFIG),
new TypeReference<List<ClusterConfig>>() {
}))
.orElse(Collections.emptyList());
}
/**
* Updates computational resource's field.
*
* @param user user name.
* @param project project name
* @param exploratoryName name of exploratory.
* @param computationalName name of computational resource.
* @param fieldName computational field's name for updating.
* @param fieldValue computational field's value for updating.
*/
private <T> UpdateResult updateComputationalField(String user, String project, String exploratoryName, String computationalName,
String fieldName, T fieldValue) {
return updateOne(USER_INSTANCES,
computationalCondition(user, project, exploratoryName, computationalName),
set(computationalFieldFilter(fieldName), fieldValue));
}
public void updateSchedulerSyncFlag(String user, String project, String exploratoryName, boolean syncFlag) {
final String syncStartField = SCHEDULER_DATA + ".sync_start_required";
UpdateResult result;
do {
result = updateOne(USER_INSTANCES, and(exploratoryCondition(user, exploratoryName, project),
elemMatch(COMPUTATIONAL_RESOURCES, and(ne(SCHEDULER_DATA, null), ne(syncStartField, syncFlag)))),
set(computationalFieldFilter(syncStartField), syncFlag));
} while (result.getModifiedCount() != 0);
}
public UpdateResult updateSchedulerDataForComputationalResource(String user, String project, String exploratoryName,
String computationalName, SchedulerJobDTO dto) {
return updateComputationalField(user, project, exploratoryName, computationalName,
SCHEDULER_DATA, Objects.isNull(dto) ? null : convertToBson(dto));
}
public void updateLastActivity(String user, String project, String exploratoryName,
String computationalName, LocalDateTime lastActivity) {
updateOne(USER_INSTANCES,
computationalCondition(user, project, exploratoryName, computationalName),
set(computationalFieldFilter(COMPUTATIONAL_LAST_ACTIVITY),
Date.from(lastActivity.atZone(ZoneId.systemDefault()).toInstant())));
}
public void updateComputeStatus(String project, String endpoint, String computeName, String instanceId, UserInstanceStatus status) {
updateOne(USER_INSTANCES,
and(eq(PROJECT, project), eq(ENDPOINT, endpoint), eq(COMPUTATIONAL_RESOURCES + "." + INSTANCE_ID, instanceId),
eq(COMPUTATIONAL_RESOURCES + "." + COMPUTATIONAL_NAME, computeName)),
set(computationalFieldFilter(STATUS), status.toString()));
}
}