| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package com.epam.dlab.backendapi.service.impl; |
| |
| import com.epam.dlab.auth.UserInfo; |
| import com.epam.dlab.backendapi.annotation.Audit; |
| import com.epam.dlab.backendapi.annotation.Project; |
| import com.epam.dlab.backendapi.annotation.ResourceName; |
| import com.epam.dlab.backendapi.annotation.User; |
| import com.epam.dlab.backendapi.dao.ComputationalDAO; |
| import com.epam.dlab.backendapi.dao.EnvDAO; |
| import com.epam.dlab.backendapi.dao.ExploratoryDAO; |
| import com.epam.dlab.backendapi.dao.SchedulerJobDAO; |
| import com.epam.dlab.backendapi.domain.RequestId; |
| import com.epam.dlab.backendapi.service.ComputationalService; |
| import com.epam.dlab.backendapi.service.ExploratoryService; |
| import com.epam.dlab.backendapi.service.SchedulerJobService; |
| import com.epam.dlab.backendapi.service.SecurityService; |
| import com.epam.dlab.dto.SchedulerJobDTO; |
| import com.epam.dlab.dto.UserInstanceDTO; |
| import com.epam.dlab.dto.UserInstanceStatus; |
| import com.epam.dlab.dto.base.DataEngineType; |
| import com.epam.dlab.dto.computational.UserComputationalResource; |
| import com.epam.dlab.exceptions.ResourceInappropriateStateException; |
| import com.epam.dlab.exceptions.ResourceNotFoundException; |
| import com.epam.dlab.model.scheduler.SchedulerJobData; |
| import com.epam.dlab.rest.client.RESTService; |
| import com.google.inject.Inject; |
| import com.google.inject.Singleton; |
| import com.google.inject.name.Named; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.commons.lang3.StringUtils; |
| |
| import java.time.DayOfWeek; |
| import java.time.LocalDate; |
| import java.time.LocalDateTime; |
| import java.time.LocalTime; |
| import java.time.OffsetDateTime; |
| import java.time.ZoneOffset; |
| import java.time.temporal.ChronoUnit; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static com.epam.dlab.backendapi.domain.AuditActionEnum.SET_UP_SCHEDULER; |
| import static com.epam.dlab.backendapi.domain.AuditResourceTypeEnum.COMPUTE; |
| import static com.epam.dlab.backendapi.domain.AuditResourceTypeEnum.NOTEBOOK; |
| import static com.epam.dlab.constants.ServiceConsts.PROVISIONING_SERVICE_NAME; |
| import static com.epam.dlab.dto.UserInstanceStatus.CONFIGURING; |
| import static com.epam.dlab.dto.UserInstanceStatus.CREATING; |
| import static com.epam.dlab.dto.UserInstanceStatus.RUNNING; |
| import static com.epam.dlab.dto.UserInstanceStatus.STARTING; |
| import static com.epam.dlab.dto.UserInstanceStatus.STOPPED; |
| import static com.epam.dlab.dto.UserInstanceStatus.STOPPING; |
| import static com.epam.dlab.dto.UserInstanceStatus.TERMINATING; |
| import static com.epam.dlab.dto.base.DataEngineType.getDockerImageName; |
| import static java.time.ZoneId.systemDefault; |
| import static java.util.Collections.singletonList; |
| import static java.util.Date.from; |
| |
| @Slf4j |
| @Singleton |
| public class SchedulerJobServiceImpl implements SchedulerJobService { |
| private static final String SCHEDULER_NOT_FOUND_MSG = "Scheduler job data not found for user %s with exploratory %s"; |
| private static final String AUDIT_MESSAGE = "Scheduled action, requested for notebook %s"; |
| private static final long ALLOWED_INACTIVITY_MINUTES = 1L; |
| |
| @Inject |
| private SchedulerJobDAO schedulerJobDAO; |
| |
| @Inject |
| private ExploratoryDAO exploratoryDAO; |
| |
| @Inject |
| private ComputationalDAO computationalDAO; |
| |
| @Inject |
| private ExploratoryService exploratoryService; |
| |
| @Inject |
| private ComputationalService computationalService; |
| |
| @Inject |
| private SecurityService securityService; |
| |
| @Inject |
| private EnvDAO envDAO; |
| |
| @Inject |
| private RequestId requestId; |
| |
| @Inject |
| @Named(PROVISIONING_SERVICE_NAME) |
| private RESTService provisioningService; |
| |
| @Override |
| public SchedulerJobDTO fetchSchedulerJobForUserAndExploratory(String user, String project, String exploratoryName) { |
| return schedulerJobDAO.fetchSingleSchedulerJobByUserAndExploratory(user, project, exploratoryName) |
| .orElseThrow(() -> new ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user, |
| exploratoryName))); |
| } |
| |
| @Override |
| public SchedulerJobDTO fetchSchedulerJobForComputationalResource(String user, String project, String exploratoryName, |
| String computationalName) { |
| return schedulerJobDAO.fetchSingleSchedulerJobForCluster(user, project, exploratoryName, computationalName) |
| .orElseThrow(() -> new ResourceNotFoundException(String.format(SCHEDULER_NOT_FOUND_MSG, user, |
| exploratoryName) + " with computational resource " + computationalName)); |
| } |
| |
| @Audit(action = SET_UP_SCHEDULER, type = NOTEBOOK) |
| @Override |
| public void updateExploratorySchedulerData(@User UserInfo user, @Project String project, @ResourceName String exploratoryName, SchedulerJobDTO dto) { |
| validateExploratoryStatus(user.getName(), project, exploratoryName); |
| populateDefaultSchedulerValues(dto); |
| log.debug("Updating exploratory {} for user {} with new scheduler job data: {}...", exploratoryName, user, |
| dto); |
| exploratoryDAO.updateSchedulerDataForUserAndExploratory(user.getName(), project, exploratoryName, dto); |
| |
| if (!dto.inactivityScheduler() && dto.isSyncStartRequired()) { |
| shareSchedulerJobDataToSparkClusters(user.getName(), project, exploratoryName, dto); |
| } else if (!dto.inactivityScheduler()) { |
| computationalDAO.updateSchedulerSyncFlag(user.getName(), project, exploratoryName, dto.isSyncStartRequired()); |
| } |
| } |
| |
| @Audit(action = SET_UP_SCHEDULER, type = COMPUTE) |
| @Override |
| public void updateComputationalSchedulerData(@User UserInfo user, @Project String project, String exploratoryName, @ResourceName String computationalName, SchedulerJobDTO dto) { |
| validateExploratoryStatus(user.getName(), project, exploratoryName); |
| validateComputationalStatus(user.getName(), project, exploratoryName, computationalName); |
| populateDefaultSchedulerValues(dto); |
| log.debug("Updating computational resource {} affiliated with exploratory {} for user {} with new scheduler " + |
| "job data {}...", computationalName, exploratoryName, user, dto); |
| computationalDAO.updateSchedulerDataForComputationalResource(user.getName(), project, exploratoryName, computationalName, dto); |
| } |
| |
| @Override |
| public void stopComputationalByScheduler() { |
| getComputationalSchedulersForStopping(OffsetDateTime.now(), true) |
| .forEach(this::stopComputational); |
| } |
| |
| @Override |
| public void stopExploratoryByScheduler() { |
| getExploratorySchedulersForStopping(OffsetDateTime.now(), true) |
| .forEach(this::stopExploratory); |
| } |
| |
| @Override |
| public void startExploratoryByScheduler() { |
| getExploratorySchedulersForStarting(OffsetDateTime.now()) |
| .forEach(this::startExploratory); |
| } |
| |
| @Override |
| public void startComputationalByScheduler() { |
| getComputationalSchedulersForStarting(OffsetDateTime.now()) |
| .forEach(job -> startSpark(job.getUser(), job.getExploratoryName(), job.getComputationalName(), |
| job.getProject())); |
| } |
| |
| @Override |
| public void terminateExploratoryByScheduler() { |
| getExploratorySchedulersForTerminating(OffsetDateTime.now()) |
| .forEach(this::terminateExploratory); |
| |
| } |
| |
| @Override |
| public void terminateComputationalByScheduler() { |
| getComputationalSchedulersForTerminating(OffsetDateTime.now()).forEach(this::terminateComputational); |
| |
| } |
| |
| @Override |
| public void removeScheduler(String user, String exploratoryName) { |
| schedulerJobDAO.removeScheduler(user, exploratoryName); |
| } |
| |
| @Override |
| public void removeScheduler(String user, String exploratoryName, String computationalName) { |
| schedulerJobDAO.removeScheduler(user, exploratoryName, computationalName); |
| } |
| |
| @Override |
| public List<SchedulerJobData> getActiveSchedulers(String user, long minutesOffset) { |
| final OffsetDateTime desiredDateTime = OffsetDateTime.now().plusMinutes(minutesOffset); |
| final Predicate<SchedulerJobData> userPredicate = s -> user.equals(s.getUser()); |
| final Stream<SchedulerJobData> computationalSchedulersStream = |
| getComputationalSchedulersForStopping(desiredDateTime) |
| .stream() |
| .filter(userPredicate); |
| final Stream<SchedulerJobData> exploratorySchedulersStream = |
| getExploratorySchedulersForStopping(desiredDateTime) |
| .stream() |
| .filter(userPredicate); |
| return Stream.concat(computationalSchedulersStream, exploratorySchedulersStream) |
| .collect(Collectors.toList()); |
| } |
| |
| private void stopComputational(SchedulerJobData job) { |
| final String project = job.getProject(); |
| final String expName = job.getExploratoryName(); |
| final String compName = job.getComputationalName(); |
| final String user = job.getUser(); |
| log.debug("Stopping exploratory {} computational {} for user {} by scheduler", expName, compName, user); |
| computationalService.stopSparkCluster(securityService.getServiceAccountInfo(user), user, project, expName, compName, String.format(AUDIT_MESSAGE, expName)); |
| } |
| |
| private void terminateComputational(SchedulerJobData job) { |
| final String user = job.getUser(); |
| final String expName = job.getExploratoryName(); |
| final String compName = job.getComputationalName(); |
| final UserInfo userInfo = securityService.getServiceAccountInfo(user); |
| log.debug("Terminating exploratory {} computational {} for user {} by scheduler", expName, compName, user); |
| computationalService.terminateComputational(userInfo, user, job.getProject(), expName, compName, String.format(AUDIT_MESSAGE, expName)); |
| } |
| |
| private void stopExploratory(SchedulerJobData job) { |
| final String expName = job.getExploratoryName(); |
| final String user = job.getUser(); |
| final String project = job.getProject(); |
| log.debug("Stopping exploratory {} for user {} by scheduler", expName, user); |
| exploratoryService.stop(securityService.getServiceAccountInfo(user), user, project, expName, String.format(AUDIT_MESSAGE, expName)); |
| } |
| |
| private List<SchedulerJobData> getExploratorySchedulersForTerminating(OffsetDateTime now) { |
| return schedulerJobDAO.getExploratorySchedulerDataWithOneOfStatus(RUNNING, STOPPED) |
| .stream() |
| .filter(canSchedulerForTerminatingBeApplied(now)) |
| .collect(Collectors.toList()); |
| } |
| |
| private List<SchedulerJobData> getComputationalSchedulersForTerminating(OffsetDateTime now) { |
| return schedulerJobDAO.getComputationalSchedulerDataWithOneOfStatus(RUNNING, STOPPED, RUNNING) |
| .stream() |
| .filter(canSchedulerForTerminatingBeApplied(now)) |
| .collect(Collectors.toList()); |
| } |
| |
| private void startExploratory(SchedulerJobData schedulerJobData) { |
| final String user = schedulerJobData.getUser(); |
| final String exploratoryName = schedulerJobData.getExploratoryName(); |
| final String project = schedulerJobData.getProject(); |
| log.debug("Starting exploratory {} for user {} by scheduler", exploratoryName, user); |
| exploratoryService.start(securityService.getServiceAccountInfo(user), exploratoryName, project, String.format(AUDIT_MESSAGE, exploratoryName)); |
| if (schedulerJobData.getJobDTO().isSyncStartRequired()) { |
| log.trace("Starting computational for exploratory {} for user {} by scheduler", exploratoryName, user); |
| final DataEngineType sparkCluster = DataEngineType.SPARK_STANDALONE; |
| final List<UserComputationalResource> compToBeStarted = |
| computationalDAO.findComputationalResourcesWithStatus(user, project, exploratoryName, STOPPED); |
| |
| compToBeStarted |
| .stream() |
| .filter(compResource -> shouldClusterBeStarted(sparkCluster, compResource)) |
| .forEach(comp -> startSpark(user, exploratoryName, comp.getComputationalName(), project)); |
| } |
| } |
| |
| private void terminateExploratory(SchedulerJobData job) { |
| final String user = job.getUser(); |
| final String project = job.getProject(); |
| final String expName = job.getExploratoryName(); |
| log.debug("Terminating exploratory {} for user {} by scheduler", expName, user); |
| exploratoryService.terminate(securityService.getUserInfoOffline(user), user, project, expName, String.format(AUDIT_MESSAGE, expName)); |
| } |
| |
| private void startSpark(String user, String expName, String compName, String project) { |
| log.debug("Starting exploratory {} computational {} for user {} by scheduler", expName, compName, user); |
| computationalService.startSparkCluster(securityService.getServiceAccountInfo(user), expName, compName, project, String.format(AUDIT_MESSAGE, expName)); |
| } |
| |
| private boolean shouldClusterBeStarted(DataEngineType sparkCluster, UserComputationalResource compResource) { |
| return Objects.nonNull(compResource.getSchedulerData()) && compResource.getSchedulerData().isSyncStartRequired() |
| && compResource.getImageName().equals(getDockerImageName(sparkCluster)); |
| } |
| |
| /** |
| * Performs bulk updating operation with scheduler data for corresponding to exploratory Spark clusters. |
| * All these resources will obtain data which is equal to exploratory's except 'stopping' operation (it will be |
| * performed automatically with notebook stopping since Spark resources have such feature). |
| * |
| * @param user user's name |
| * @param project project name |
| * @param exploratoryName name of exploratory resource |
| * @param dto scheduler job data. |
| */ |
| private void shareSchedulerJobDataToSparkClusters(String user, String project, String exploratoryName, SchedulerJobDTO dto) { |
| List<String> correspondingSparkClusters = computationalDAO.getComputationalResourcesWhereStatusIn(user, project, |
| singletonList(DataEngineType.SPARK_STANDALONE), |
| exploratoryName, STARTING, RUNNING, STOPPING, STOPPED); |
| SchedulerJobDTO dtoWithoutStopData = getSchedulerJobWithoutStopData(dto); |
| for (String sparkName : correspondingSparkClusters) { |
| log.debug("Updating computational resource {} affiliated with exploratory {} for user {} with new " + |
| "scheduler job data {}...", sparkName, exploratoryName, user, dtoWithoutStopData); |
| computationalDAO.updateSchedulerDataForComputationalResource(user, project, exploratoryName, |
| sparkName, dtoWithoutStopData); |
| } |
| } |
| |
| private List<SchedulerJobData> getExploratorySchedulersForStopping(OffsetDateTime currentDateTime) { |
| return schedulerJobDAO.getExploratorySchedulerDataWithStatus(RUNNING) |
| .stream() |
| .filter(canSchedulerForStoppingBeApplied(currentDateTime, true)) |
| .collect(Collectors.toList()); |
| } |
| |
| private List<SchedulerJobData> getExploratorySchedulersForStopping(OffsetDateTime currentDateTime, |
| boolean checkInactivity) { |
| final Date clusterMaxInactivityAllowedDate = |
| from(LocalDateTime.now().minusMinutes(ALLOWED_INACTIVITY_MINUTES).atZone(systemDefault()).toInstant()); |
| return schedulerJobDAO.getExploratorySchedulerWithStatusAndClusterLastActivityLessThan(RUNNING, |
| clusterMaxInactivityAllowedDate) |
| .stream() |
| .filter(canSchedulerForStoppingBeApplied(currentDateTime, false) |
| .or(schedulerJobData -> checkInactivity && exploratoryInactivityCondition(schedulerJobData))) |
| .collect(Collectors.toList()); |
| } |
| |
| private List<SchedulerJobData> getExploratorySchedulersForStarting(OffsetDateTime currentDateTime) { |
| return schedulerJobDAO.getExploratorySchedulerDataWithStatus(STOPPED) |
| .stream() |
| .filter(canSchedulerForStartingBeApplied(currentDateTime)) |
| .collect(Collectors.toList()); |
| } |
| |
| private List<SchedulerJobData> getComputationalSchedulersForStarting(OffsetDateTime currentDateTime) { |
| return schedulerJobDAO |
| .getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, STOPPED) |
| .stream() |
| .filter(canSchedulerForStartingBeApplied(currentDateTime)) |
| .collect(Collectors.toList()); |
| } |
| |
| private Predicate<SchedulerJobData> canSchedulerForStoppingBeApplied(OffsetDateTime currentDateTime, boolean usingOffset) { |
| return schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(), |
| currentDateTime, schedulerJobData.getJobDTO().getStopDaysRepeat(), |
| schedulerJobData.getJobDTO().getEndTime(), usingOffset); |
| } |
| |
| private Predicate<SchedulerJobData> canSchedulerForStartingBeApplied(OffsetDateTime currentDateTime) { |
| return schedulerJobData -> shouldSchedulerBeExecuted(schedulerJobData.getJobDTO(), |
| currentDateTime, schedulerJobData.getJobDTO().getStartDaysRepeat(), |
| schedulerJobData.getJobDTO().getStartTime(), false); |
| } |
| |
| private Predicate<SchedulerJobData> canSchedulerForTerminatingBeApplied(OffsetDateTime currentDateTime) { |
| return schedulerJobData -> shouldBeTerminated(currentDateTime, schedulerJobData); |
| } |
| |
| private boolean shouldBeTerminated(OffsetDateTime currentDateTime, SchedulerJobData schedulerJobData) { |
| final SchedulerJobDTO jobDTO = schedulerJobData.getJobDTO(); |
| final ZoneOffset timeZoneOffset = jobDTO.getTimeZoneOffset(); |
| final LocalDateTime convertedCurrentTime = localDateTimeAtZone(currentDateTime, timeZoneOffset); |
| final LocalDateTime terminateDateTime = jobDTO.getTerminateDateTime(); |
| return Objects.nonNull(terminateDateTime) && isSchedulerActive(jobDTO, convertedCurrentTime) && |
| convertedCurrentTime.equals(terminateDateTime.atOffset(timeZoneOffset).toLocalDateTime()); |
| } |
| |
| private List<SchedulerJobData> getComputationalSchedulersForStopping(OffsetDateTime currentDateTime) { |
| return schedulerJobDAO |
| .getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, RUNNING) |
| .stream() |
| .filter(canSchedulerForStoppingBeApplied(currentDateTime, true)) |
| .collect(Collectors.toList()); |
| } |
| |
| private List<SchedulerJobData> getComputationalSchedulersForStopping(OffsetDateTime currentDateTime, |
| boolean checkInactivity) { |
| return schedulerJobDAO |
| .getComputationalSchedulerDataWithOneOfStatus(RUNNING, DataEngineType.SPARK_STANDALONE, RUNNING) |
| .stream() |
| .filter(canSchedulerForStoppingBeApplied(currentDateTime, false) |
| .or(schedulerJobData -> checkInactivity && computationalInactivityCondition(schedulerJobData))) |
| .collect(Collectors.toList()); |
| } |
| |
| private boolean computationalInactivityCondition(SchedulerJobData jobData) { |
| final SchedulerJobDTO schedulerData = jobData.getJobDTO(); |
| return schedulerData.isCheckInactivityRequired() && computationalInactivityExceed(jobData, schedulerData); |
| } |
| |
| private boolean computationalInactivityExceed(SchedulerJobData schedulerJobData, SchedulerJobDTO schedulerData) { |
| final String projectName = schedulerJobData.getProject(); |
| final String explName = schedulerJobData.getExploratoryName(); |
| final String compName = schedulerJobData.getComputationalName(); |
| final String user = schedulerJobData.getUser(); |
| final UserComputationalResource c = computationalDAO.fetchComputationalFields(user, projectName, explName, compName); |
| final Long maxInactivity = schedulerData.getMaxInactivity(); |
| return inactivityCondition(maxInactivity, c.getStatus(), c.getLastActivity()); |
| } |
| |
| private boolean exploratoryInactivityCondition(SchedulerJobData jobData) { |
| final SchedulerJobDTO schedulerData = jobData.getJobDTO(); |
| return schedulerData.isCheckInactivityRequired() && exploratoryInactivityExceed(jobData, schedulerData); |
| } |
| |
| private boolean exploratoryInactivityExceed(SchedulerJobData schedulerJobData, SchedulerJobDTO schedulerData) { |
| final String project = schedulerJobData.getProject(); |
| final String expName = schedulerJobData.getExploratoryName(); |
| final String user = schedulerJobData.getUser(); |
| final UserInstanceDTO userInstanceDTO = exploratoryDAO.fetchExploratoryFields(user, project, expName, true); |
| final boolean canBeStopped = userInstanceDTO.getResources() |
| .stream() |
| .map(UserComputationalResource::getStatus) |
| .map(UserInstanceStatus::of) |
| .noneMatch(status -> status.in(TERMINATING, CONFIGURING, CREATING, CREATING)); |
| return canBeStopped && inactivityCondition(schedulerData.getMaxInactivity(), userInstanceDTO.getStatus(), |
| userInstanceDTO.getLastActivity()); |
| } |
| |
| private boolean inactivityCondition(Long maxInactivity, String status, LocalDateTime lastActivity) { |
| return UserInstanceStatus.RUNNING.toString().equals(status) && |
| Optional.ofNullable(lastActivity) |
| .map(la -> la.plusMinutes(maxInactivity).isBefore(LocalDateTime.now())) |
| .orElse(Boolean.FALSE); |
| } |
| |
| private void populateDefaultSchedulerValues(SchedulerJobDTO dto) { |
| if (Objects.isNull(dto.getBeginDate()) || StringUtils.isBlank(dto.getBeginDate().toString())) { |
| dto.setBeginDate(LocalDate.now()); |
| } |
| if (Objects.isNull(dto.getTimeZoneOffset()) || StringUtils.isBlank(dto.getTimeZoneOffset().toString())) { |
| dto.setTimeZoneOffset(OffsetDateTime.now(systemDefault()).getOffset()); |
| } |
| } |
| |
| private void validateExploratoryStatus(String user, String project, String exploratoryName) { |
| final UserInstanceDTO userInstance = exploratoryDAO.fetchExploratoryFields(user, project, exploratoryName); |
| validateResourceStatus(userInstance.getStatus()); |
| } |
| |
| private void validateComputationalStatus(String user, String project, String exploratoryName, String computationalName) { |
| final UserComputationalResource computationalResource = |
| computationalDAO.fetchComputationalFields(user, project, exploratoryName, computationalName); |
| final String computationalStatus = computationalResource.getStatus(); |
| validateResourceStatus(computationalStatus); |
| } |
| |
| private void validateResourceStatus(String resourceStatus) { |
| final UserInstanceStatus status = UserInstanceStatus.of(resourceStatus); |
| if (Objects.isNull(status) || status.in(UserInstanceStatus.TERMINATED, TERMINATING, |
| UserInstanceStatus.FAILED)) { |
| throw new ResourceInappropriateStateException(String.format("Can not create/update scheduler for user " + |
| "instance with status: %s", status)); |
| } |
| } |
| |
| private boolean shouldSchedulerBeExecuted(SchedulerJobDTO dto, OffsetDateTime dateTime, List<DayOfWeek> daysRepeat, |
| LocalTime time, boolean usingOffset) { |
| LocalDateTime convertedDateTime = localDateTimeAtZone(dateTime, dto.getTimeZoneOffset()); |
| return isSchedulerActive(dto, convertedDateTime) |
| && daysRepeat.contains(convertedDateTime.toLocalDate().getDayOfWeek()) |
| && timeFilter(time, convertedDateTime.toLocalTime(), usingOffset); |
| } |
| |
| private boolean timeFilter(LocalTime time, LocalTime convertedDateTime, boolean usingOffset) { |
| return usingOffset ? (time.isBefore(convertedDateTime) && time.isAfter(LocalDateTime.now().toLocalTime())) : |
| convertedDateTime.equals(time); |
| } |
| |
| private boolean isSchedulerActive(SchedulerJobDTO dto, LocalDateTime convertedDateTime) { |
| return !convertedDateTime.toLocalDate().isBefore(dto.getBeginDate()) |
| && finishDateAfterCurrentDate(dto, convertedDateTime); |
| } |
| |
| private LocalDateTime localDateTimeAtZone(OffsetDateTime dateTime, ZoneOffset timeZoneOffset) { |
| return dateTime.atZoneSameInstant(ZoneOffset.UTC) |
| .truncatedTo(ChronoUnit.MINUTES) |
| .withZoneSameInstant(timeZoneOffset) |
| .toLocalDateTime(); |
| } |
| |
| private boolean finishDateAfterCurrentDate(SchedulerJobDTO dto, LocalDateTime currentDateTime) { |
| return Objects.isNull(dto.getFinishDate()) || !currentDateTime.toLocalDate().isAfter(dto.getFinishDate()); |
| } |
| |
| private SchedulerJobDTO getSchedulerJobWithoutStopData(SchedulerJobDTO dto) { |
| SchedulerJobDTO convertedDto = new SchedulerJobDTO(); |
| convertedDto.setBeginDate(dto.getBeginDate()); |
| convertedDto.setFinishDate(dto.getFinishDate()); |
| convertedDto.setStartTime(dto.getStartTime()); |
| convertedDto.setStartDaysRepeat(dto.getStartDaysRepeat()); |
| convertedDto.setTerminateDateTime(dto.getTerminateDateTime()); |
| convertedDto.setTimeZoneOffset(dto.getTimeZoneOffset()); |
| convertedDto.setSyncStartRequired(dto.isSyncStartRequired()); |
| return convertedDto; |
| } |
| |
| } |
| |