blob: fc292dd4bacc1876a47e7ca2017ed13c758fea72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.epam.dlab.backendapi.dao;
import com.epam.dlab.dto.SchedulerJobDTO;
import com.epam.dlab.dto.UserInstanceStatus;
import com.epam.dlab.dto.base.DataEngineType;
import com.epam.dlab.model.scheduler.SchedulerJobData;
import com.google.inject.Singleton;
import com.mongodb.client.FindIterable;
import com.mongodb.client.model.Filters;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.epam.dlab.backendapi.dao.ComputationalDAO.COMPUTATIONAL_NAME;
import static com.epam.dlab.backendapi.dao.ComputationalDAO.IMAGE;
import static com.epam.dlab.backendapi.dao.ComputationalDAO.PROJECT;
import static com.epam.dlab.backendapi.dao.ExploratoryDAO.COMPUTATIONAL_RESOURCES;
import static com.epam.dlab.backendapi.dao.ExploratoryDAO.EXPLORATORY_NAME;
import static com.epam.dlab.backendapi.dao.ExploratoryDAO.exploratoryCondition;
import static com.epam.dlab.backendapi.dao.MongoCollections.USER_INSTANCES;
import static com.epam.dlab.dto.base.DataEngineType.fromDockerImageName;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Filters.exists;
import static com.mongodb.client.model.Filters.in;
import static com.mongodb.client.model.Filters.lte;
import static com.mongodb.client.model.Filters.ne;
import static com.mongodb.client.model.Filters.or;
import static com.mongodb.client.model.Projections.excludeId;
import static com.mongodb.client.model.Projections.fields;
import static com.mongodb.client.model.Projections.include;
import static java.util.stream.Collectors.toList;
/**
* DAO for user's scheduler jobs.
*/
@Slf4j
@Singleton
public class SchedulerJobDAO extends BaseDAO {
static final String SCHEDULER_DATA = "scheduler_data";
private static final String CONSIDER_INACTIVITY_FLAG = SCHEDULER_DATA + ".consider_inactivity";
public static final String TIMEZONE_PREFIX = "UTC";
private static final String LAST_ACTIVITY = "last_activity";
private static final String CHECK_INACTIVITY_REQUIRED = "check_inactivity_required";
private static final String CHECK_INACTIVITY_FLAG = SCHEDULER_DATA + "." + CHECK_INACTIVITY_REQUIRED;
public SchedulerJobDAO() {
log.info("{} is initialized", getClass().getSimpleName());
}
/**
* Returns condition for search scheduler for exploratory which is not null.
*
* @return Bson condition.
*/
private Bson schedulerNotNullCondition() {
return and(exists(SCHEDULER_DATA), ne(SCHEDULER_DATA, null));
}
/**
* Finds and returns the info of user's single scheduler job by exploratory name.
*
* @param user user name.
* @param project project name
* @param exploratoryName the name of exploratory.
* @return scheduler job data.
*/
public Optional<SchedulerJobDTO> fetchSingleSchedulerJobByUserAndExploratory(String user, String project, String exploratoryName) {
return findOne(USER_INSTANCES,
and(exploratoryCondition(user, exploratoryName, project), schedulerNotNullCondition()),
fields(include(SCHEDULER_DATA), excludeId()))
.map(d -> convertFromDocument((Document) d.get(SCHEDULER_DATA), SchedulerJobDTO.class));
}
/**
* Finds and returns the info of user's single scheduler job for computational resource.
*
* @param user user name.
* @param project project name
* @param exploratoryName the name of exploratory.
* @param computationalName the name of computational resource.
* @return scheduler job data.
*/
@SuppressWarnings("unchecked")
public Optional<SchedulerJobDTO> fetchSingleSchedulerJobForCluster(String user, String project, String exploratoryName,
String computationalName) {
return findOne(USER_INSTANCES,
exploratoryCondition(user, exploratoryName, project),
fields(include(COMPUTATIONAL_RESOURCES), excludeId()))
.map(d -> (List<Document>) d.get(COMPUTATIONAL_RESOURCES))
.map(list -> list.stream().filter(d -> d.getString(COMPUTATIONAL_NAME).equals(computationalName))
.findAny().orElse(new Document()))
.map(d -> (Document) d.get(SCHEDULER_DATA))
.map(d -> convertFromDocument(d, SchedulerJobDTO.class));
}
/**
* Finds and returns the list of all scheduler jobs for starting/stopping/terminating exploratories regarding to
* parameter passed.
*
* @param status 'running' value for starting exploratory, 'stopped' - for stopping and 'terminated' -
* for
* terminating.
* @return list of scheduler jobs.
*/
public List<SchedulerJobData> getExploratorySchedulerDataWithStatus(UserInstanceStatus status) {
FindIterable<Document> userInstances = userInstancesWithScheduler(eq(STATUS, status.toString()));
return stream(userInstances).map(d -> convertFromDocument(d, SchedulerJobData.class))
.collect(toList());
}
public List<SchedulerJobData> getExploratorySchedulerWithStatusAndClusterLastActivityLessThan(UserInstanceStatus status,
Date lastActivity) {
return stream(find(USER_INSTANCES,
and(
eq(STATUS, status.toString()),
schedulerNotNullCondition(),
or(and(eq(CONSIDER_INACTIVITY_FLAG, true),
or(eq(COMPUTATIONAL_RESOURCES, Collections.emptyList()),
and(ne(COMPUTATIONAL_RESOURCES, Collections.emptyList()),
Filters.elemMatch(COMPUTATIONAL_RESOURCES,
lte(LAST_ACTIVITY, lastActivity))))),
eq(CONSIDER_INACTIVITY_FLAG, false)
)
),
fields(excludeId(), include(USER, PROJECT, EXPLORATORY_NAME, SCHEDULER_DATA))))
.map(d -> convertFromDocument(d, SchedulerJobData.class))
.collect(toList());
}
public List<SchedulerJobData> getExploratorySchedulerDataWithOneOfStatus(UserInstanceStatus... statuses) {
FindIterable<Document> userInstances = userInstancesWithScheduler(in(STATUS,
Arrays.stream(statuses).map(UserInstanceStatus::toString).collect(toList())));
return stream(userInstances).map(d -> convertFromDocument(d, SchedulerJobData.class))
.collect(toList());
}
public List<SchedulerJobData> getComputationalSchedulerDataWithOneOfStatus(UserInstanceStatus exploratoryStatus,
DataEngineType dataEngineType,
UserInstanceStatus... statuses) {
return stream(computationalResourcesWithScheduler(exploratoryStatus))
.map(doc -> computationalSchedulerDataStream(doc, dataEngineType, statuses))
.flatMap(Function.identity())
.collect(toList());
}
public List<SchedulerJobData> getComputationalSchedulerDataWithOneOfStatus(UserInstanceStatus exploratoryStatus,
UserInstanceStatus... statuses) {
return stream(computationalResourcesWithScheduler(exploratoryStatus))
.map(doc -> computationalSchedulerData(doc, statuses).map(compResource -> toSchedulerData(doc,
compResource)))
.flatMap(Function.identity())
.collect(toList());
}
private FindIterable<Document> computationalResourcesWithScheduler(UserInstanceStatus exploratoryStatus) {
final Bson computationalSchedulerCondition = Filters.elemMatch(COMPUTATIONAL_RESOURCES,
and(schedulerNotNullCondition()));
return find(USER_INSTANCES,
and(eq(STATUS, exploratoryStatus.toString()), computationalSchedulerCondition),
fields(excludeId(), include(USER, PROJECT, EXPLORATORY_NAME, COMPUTATIONAL_RESOURCES)));
}
public void removeScheduler(String user, String exploratory) {
updateOne(USER_INSTANCES, and(eq(USER, user), eq(EXPLORATORY_NAME, exploratory)),
unset(SCHEDULER_DATA, StringUtils.EMPTY));
}
public void removeScheduler(String user, String exploratory, String computational) {
updateOne(USER_INSTANCES, and(eq(USER, user), eq(EXPLORATORY_NAME, exploratory),
Filters.elemMatch(COMPUTATIONAL_RESOURCES, eq(COMPUTATIONAL_NAME, computational))),
unset(COMPUTATIONAL_RESOURCES + ".$." + SCHEDULER_DATA, StringUtils.EMPTY));
}
private FindIterable<Document> userInstancesWithScheduler(Bson statusCondition) {
return find(USER_INSTANCES,
and(
statusCondition,
schedulerNotNullCondition(), eq(CHECK_INACTIVITY_FLAG, false)
),
fields(excludeId(), include(USER, EXPLORATORY_NAME, PROJECT, SCHEDULER_DATA)));
}
private Stream<SchedulerJobData> computationalSchedulerDataStream(Document doc, DataEngineType computationalType,
UserInstanceStatus... computationalStatuses) {
return computationalSchedulerData(doc, computationalStatuses)
.filter(compResource -> fromDockerImageName(compResource.getString(IMAGE)) == computationalType)
.map(compResource -> toSchedulerData(doc, compResource));
}
private SchedulerJobData toSchedulerData(Document userInstanceDocument, Document compResource) {
final String user = userInstanceDocument.getString(USER);
final String project = userInstanceDocument.getString(PROJECT);
final String exploratoryName = userInstanceDocument.getString(EXPLORATORY_NAME);
final String computationalName = compResource.getString(COMPUTATIONAL_NAME);
final SchedulerJobDTO schedulerData = convertFromDocument((Document) compResource.get(SCHEDULER_DATA),
SchedulerJobDTO.class);
return new SchedulerJobData(user, exploratoryName, computationalName, project, schedulerData);
}
@SuppressWarnings("unchecked")
private Stream<Document> computationalSchedulerData(Document doc, UserInstanceStatus... computationalStatuses) {
final Set<String> statusSet = Arrays.stream(computationalStatuses)
.map(UserInstanceStatus::toString)
.collect(Collectors.toSet());
return ((List<Document>) doc.get(COMPUTATIONAL_RESOURCES))
.stream()
.filter(compResource -> Objects.nonNull(compResource.get(SCHEDULER_DATA)) &&
statusSet.contains(compResource.getString(STATUS)));
}
}