| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.gobblin.service.modules.utils; |
| |
| import com.google.common.base.Optional; |
| import com.typesafe.config.Config; |
| import java.io.IOException; |
| import java.util.Map; |
| import lombok.Data; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.gobblin.configuration.ConfigurationKeys; |
| import org.apache.gobblin.instrumented.Instrumented; |
| import org.apache.gobblin.metrics.event.EventSubmitter; |
| import org.apache.gobblin.metrics.event.TimingEvent; |
| import org.apache.gobblin.runtime.api.FlowSpec; |
| import org.apache.gobblin.service.modules.flow.SpecCompiler; |
| import org.apache.gobblin.service.modules.flowgraph.Dag; |
| import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; |
| import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; |
| import org.apache.gobblin.service.modules.spec.JobExecutionPlan; |
| import org.apache.gobblin.service.monitoring.FlowStatusGenerator; |
| import org.apache.gobblin.util.ConfigUtils; |
| |
| |
| /** |
| * Helper class with functionality meant to be re-used between the DagManager and Orchestrator when launching |
| * executions of a flow spec. In the common case, the Orchestrator receives a flow to orchestrate, performs necessary |
| * validations, and forwards the execution responsibility to the DagManager. The DagManager's responsibility is to |
| * carry out any flow action requests. However, with launch executions now being stored in the DagActionStateStore, on |
| * restart or leadership change the DagManager has to perform validations before executing any launch actions the |
| * previous leader was unable to complete. Rather than duplicating the code or introducing a circular dependency between |
| * the DagManager and Orchestrator, this class is utilized to store the common functionality. It is stateful, |
| * requiring all stateful pieces to be passed as input from the caller upon instantiating the helper. |
| * Note: We expect further refactoring to be done to the DagManager in later stage of multi-active development, so we do |
| * not attempt major reorganization as abstractions may change. |
| */ |
| @Slf4j |
| @Data |
| public final class FlowCompilationValidationHelper { |
| private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton; |
| private final SpecCompiler specCompiler; |
| private final UserQuotaManager quotaManager; |
| private final EventSubmitter eventSubmitter; |
| private final FlowStatusGenerator flowStatusGenerator; |
| private final boolean isFlowConcurrencyEnabled; |
| |
| /** |
| * For a given a flowSpec, verifies that an execution is allowed (in case there is an ongoing execution) and the |
| * flowspec can be compiled. If the pre-conditions hold, then a JobExecutionPlan is constructed and returned to the |
| * caller. |
| * @param flowSpec |
| * @return jobExecutionPlan dag if one can be constructed for the given flowSpec |
| */ |
| public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowSpec) |
| throws IOException, InterruptedException { |
| Config flowConfig = flowSpec.getConfig(); |
| String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); |
| String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); |
| |
| //Wait for the SpecCompiler to become healthy. |
| specCompiler.awaitHealthy(); |
| |
| TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED); |
| Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); |
| Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional = |
| validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName, flowMetadata); |
| |
| if (!jobExecutionPlanDagOptional.isPresent()) { |
| return Optional.absent(); |
| } |
| |
| if (jobExecutionPlanDagOptional.get().isEmpty()) { |
| populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata); |
| return Optional.absent(); |
| } |
| |
| flowCompilationTimer.stop(flowMetadata); |
| return jobExecutionPlanDagOptional; |
| } |
| |
| /** |
| * Checks if flowSpec disallows concurrent executions, and if so then checks if another instance of the flow is |
| * already running and emits a FLOW FAILED event. Otherwise, this check passes. |
| * @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow and compile flowSpec, else Optional.absent() |
| * @throws IOException |
| */ |
| public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec, |
| String flowGroup, String flowName, Map<String,String> flowMetadata) throws IOException { |
| boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, |
| ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, isFlowConcurrencyEnabled); |
| |
| Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(flowSpec); |
| if (jobExecutionPlanDag.isEmpty()) { |
| return Optional.absent(); |
| } |
| addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag); |
| |
| if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution, |
| Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))) { |
| return Optional.fromNullable(jobExecutionPlanDag); |
| } else { |
| log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since " |
| + "concurrent executions are disabled for this flow.", flowGroup, flowName); |
| sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec, |
| SharedFlowMetricsSingleton.CompiledState.SKIPPED); |
| Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter()); |
| if (!flowSpec.isScheduled()) { |
| // For ad-hoc flow, we might already increase quota, we need to decrease here |
| for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) { |
| quotaManager.releaseQuota(dagNode); |
| } |
| } |
| // Send FLOW_FAILED event |
| flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent " |
| + "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour."); |
| new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata); |
| return Optional.absent(); |
| } |
| } |
| |
| /** |
| * Check if a FlowSpec instance is allowed to run. |
| * |
| * @param flowName |
| * @param flowGroup |
| * @param allowConcurrentExecution |
| * @return true if the {@link FlowSpec} allows concurrent executions or if no other instance of the flow is currently RUNNING. |
| */ |
| private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup, |
| boolean allowConcurrentExecution, long flowExecutionId) { |
| return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId); |
| } |
| |
| /** |
| * Abstraction used to populate the message of and emit a FlowCompileFailed event for the Orchestrator. |
| * @param flowSpec |
| * @param flowMetadata |
| */ |
| public static void populateFlowCompilationFailedEventMessage(EventSubmitter eventSubmitter, |
| FlowSpec flowSpec, Map<String, String> flowMetadata) { |
| // For scheduled flows, we do not insert the flowExecutionId into the FlowSpec. As a result, if the flow |
| // compilation fails (i.e. we are unable to find a path), the metadata will not have flowExecutionId. |
| // In this case, the current time is used as the flow executionId. |
| flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, |
| Long.toString(System.currentTimeMillis())); |
| |
| String message = "Flow was not compiled successfully."; |
| if (!flowSpec.getCompilationErrors().isEmpty()) { |
| message = message + " Compilation errors encountered: " + flowSpec.getCompilationErrors(); |
| } |
| flowMetadata.put(TimingEvent.METADATA_MESSAGE, message); |
| |
| new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILE_FAILED).stop(flowMetadata); |
| } |
| |
| /** |
| * If it is a scheduled flow run without multi-active scheduler configuration (where the FlowSpec does not have a |
| * flowExecutionId) and the flow compilation is successful, retrieve flowExecutionId from the JobSpec. |
| */ |
| public static void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata, |
| Dag<JobExecutionPlan> jobExecutionPlanDag) { |
| flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, |
| jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty( |
| ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); |
| } |
| } |