blob: cde02b3ef29aadde67b4e31cea9ce2f7c2ff4260 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
* Orchestrator that is a {@link SpecCatalogListener}. It listens to changes
* to {@link TopologyCatalog} and updates {@link SpecCompiler} state.
*/
@Alpha
public class Orchestrator implements SpecCatalogListener, Instrumentable {
private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = "jobStatusRetriever.class";
protected final Logger _log;
protected final SpecCompiler specCompiler;
protected final Optional<TopologyCatalog> topologyCatalog;
protected final Optional<DagManager> dagManager;
protected final MetricContext metricContext;
protected final Optional<EventSubmitter> eventSubmitter;
private final boolean flowConcurrencyFlag;
@Getter
private Optional<Meter> flowOrchestrationSuccessFulMeter;
@Getter
private Optional<Meter> flowOrchestrationFailedMeter;
@Getter
private Optional<Timer> flowOrchestrationTimer;
@Setter
private FlowStatusGenerator flowStatusGenerator;
private final ClassAliasResolver<SpecCompiler> aliasResolver;
private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
boolean instrumentationEnabled) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
try {
String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
specCompilerClassName = config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
}
_log.info("Using specCompiler class name/alias " + specCompilerClassName);
this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(
specCompilerClassName)), config);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException
| ClassNotFoundException e) {
throw new RuntimeException(e);
}
//At this point, the TopologySpecMap is initialized by the SpecCompiler. Pass the TopologySpecMap to the DagManager.
if (this.dagManager.isPresent()) {
this.dagManager.get().setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
}
if (instrumentationEnabled) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.specCompiler.getClass());
this.flowOrchestrationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
this.flowOrchestrationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
} else {
this.metricContext = null;
this.flowOrchestrationSuccessFulMeter = Optional.absent();
this.flowOrchestrationFailedMeter = Optional.absent();
this.flowOrchestrationTimer = Optional.absent();
this.eventSubmitter = Optional.absent();
}
this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
}
public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log) {
this(config, topologyCatalog, dagManager, log, true);
}
public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Logger log) {
this(config, topologyCatalog, dagManager, Optional.of(log));
}
public Orchestrator(Config config, Logger log) {
this(config, Optional.<TopologyCatalog>absent(), Optional.<DagManager>absent(), Optional.of(log));
}
/** Constructor with no logging */
public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog) {
this(config, topologyCatalog, Optional.<DagManager>absent(), Optional.<Logger>absent());
}
public Orchestrator(Config config) {
this(config, Optional.<TopologyCatalog>absent(), Optional.<DagManager>absent(), Optional.<Logger>absent());
}
@VisibleForTesting
public SpecCompiler getSpecCompiler() {
return this.specCompiler;
}
/** {@inheritDoc} */
@Override
public AddSpecResponse onAddSpec(Spec addedSpec) {
if (addedSpec instanceof TopologySpec) {
_log.info("New Spec detected of type TopologySpec: " + addedSpec);
this.specCompiler.onAddSpec(addedSpec);
}
return new AddSpecResponse(null);
}
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
}
/** {@inheritDoc} */
@Override
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) {
_log.info("Spec deletion detected: " + deletedSpecURI + "/" + deletedSpecVersion);
if (topologyCatalog.isPresent()) {
this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion, headers);
}
}
/** {@inheritDoc} */
@Override
public void onUpdateSpec(Spec updatedSpec) {
_log.info("Spec changed: " + updatedSpec);
if (!(updatedSpec instanceof TopologySpec)) {
return;
}
try {
onDeleteSpec(updatedSpec.getUri(), updatedSpec.getVersion());
} catch (Exception e) {
_log.error("Failed to update Spec: " + updatedSpec, e);
}
try {
onAddSpec(updatedSpec);
} catch (Exception e) {
_log.error("Failed to update Spec: " + updatedSpec, e);
}
}
public void orchestrate(Spec spec) throws Exception {
// Add below waiting because TopologyCatalog and FlowCatalog service can be launched at the same time
this.topologyCatalog.get().getInitComplete().await();
//Wait for the SpecCompiler to become healthy.
this.getSpecCompiler().awaitHealthy();
long startTime = System.nanoTime();
if (spec instanceof FlowSpec) {
Config flowConfig = ((FlowSpec) spec).getConfig();
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
if (!flowGauges.containsKey(spec.getUri().toString())) {
String flowCompiledGaugeName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, flowName, ServiceMetricNames.COMPILED);
flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
ContextAwareGauge<Integer> gauge = RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, () -> flowGauges.get(spec.getUri().toString()).state.value);
RootMetricContext.get().register(flowCompiledGaugeName, gauge);
}
//If the FlowSpec disallows concurrent executions, then check if another instance of the flow is already
//running. If so, return immediately.
boolean allowConcurrentExecution = ConfigUtils
.getBoolean(flowConfig, ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
_log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup, flowName);
flowGauges.get(spec.getUri().toString()).setState(CompiledState.SKIPPED);
// Send FLOW_FAILED event
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent "
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flow spec to change this behaviour.");
if (this.eventSubmitter.isPresent()) {
this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
}
return;
}
TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
? this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
: null;
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
// For scheduled flows, we do not insert the flowExecutionId into the FlowSpec. As a result, if the flow
// compilation fails (i.e. we are unable to find a path), the metadata will not have flowExecutionId.
// In this case, the current time is used as the flow executionId.
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
Long.toString(System.currentTimeMillis()));
String message = "Flow was not compiled successfully.";
if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
message = message + " Compilation errors encountered: " + ((FlowSpec) spec).getCompilationErrors();
}
flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
TimingEvent flowCompileFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get()
.getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILE_FAILED) : null;
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
_log.warn("Cannot determine an executor to run on for Spec: " + spec);
if (flowCompileFailedTimer != null) {
flowCompileFailedTimer.stop(flowMetadata);
}
return;
} else {
flowGauges.get(spec.getUri().toString()).setState(CompiledState.SUCCESSFUL);
}
//If it is a scheduled flow (and hence, does not have flowExecutionId in the FlowSpec) and the flow compilation is successful,
// retrieve the flowExecutionId from the JobSpec.
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
if (flowCompilationTimer != null) {
flowCompilationTimer.stop(flowMetadata);
}
if (this.dagManager.isPresent()) {
//Send the dag to the DagManager.
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
} else {
// Schedule all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) {
DagManagerUtils.incrementJobAttempt(dagNode);
JobExecutionPlan jobExecutionPlan = dagNode.getValue();
// Run this spec on selected executor
SpecProducer producer = null;
try {
producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
Spec jobSpec = jobExecutionPlan.getJobSpec();
if (!((JobSpec) jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
_log.warn("JobSpec does not contain flowExecutionId.");
}
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
_log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer));
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
producer.addSpec(jobSpec);
if (jobOrchestrationTimer != null) {
jobOrchestrationTimer.stop(jobMetadata);
}
} catch (Exception e) {
_log.error("Cannot successfully setup spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer
+ " for flow: " + spec, e);
}
}
}
} else {
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
throw new RuntimeException("Spec not of type FlowSpec, cannot orchestrate: " + spec);
}
Instrumented.markMeter(this.flowOrchestrationSuccessFulMeter);
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
/**
* Check if a FlowSpec instance is allowed to run.
*
* @param flowName
* @param flowGroup
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if no other instance of the flow is currently RUNNING.
*/
private boolean canRun(String flowName, String flowGroup, boolean allowConcurrentExecution) {
if (allowConcurrentExecution) {
return true;
} else {
return !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
}
}
public void remove(Spec spec, Properties headers) throws IOException {
// TODO: Evolve logic to cache and reuse previously compiled JobSpecs
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all executions
if (spec instanceof FlowSpec) {
//Send the dag to the DagManager to stop it.
//Also send it to the SpecProducer to do any cleanup tasks on SpecExecutor.
if (this.dagManager.isPresent()) {
_log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri());
this.dagManager.get().stopDag(spec.getUri());
}
// We need to recompile the flow to find the spec producer,
// If compilation result is different, its remove request can go to some different spec producer
deleteFromExecutor(spec, headers);
} else {
throw new RuntimeException("Spec not of type FlowSpec, cannot delete: " + spec);
}
}
private void deleteFromExecutor(Spec spec, Properties headers) {
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
if (jobExecutionPlanDag.isEmpty()) {
_log.warn("Cannot determine an executor to delete Spec: " + spec);
return;
}
// Delete all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
JobExecutionPlan jobExecutionPlan = dagNode.getValue();
Spec jobSpec = jobExecutionPlan.getJobSpec();
try {
SpecProducer<Spec> producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
_log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
producer.deleteSpec(jobSpec.getUri(), headers);
} catch (Exception e) {
_log.error(String.format("Could not delete JobSpec: %s for flow: %s", jobSpec, spec), e);
}
}
}
private FlowStatusGenerator buildFlowStatusGenerator(Config config) {
JobStatusRetriever jobStatusRetriever;
try {
Class jobStatusRetrieverClass = Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
jobStatusRetriever =
(JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
} catch (ReflectiveOperationException e) {
_log.error("Exception encountered when instantiating JobStatusRetriever");
throw new RuntimeException(e);
}
return FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
}
@Nonnull
@Override
public MetricContext getMetricContext() {
return this.metricContext;
}
@Override
public boolean isInstrumentationEnabled() {
return null != this.metricContext;
}
@Override
public List<Tag<?>> generateTags(State state) {
return Collections.emptyList();
}
@Override
public void switchMetricContext(List<Tag<?>> tags) {
throw new UnsupportedOperationException();
}
@Override
public void switchMetricContext(MetricContext context) {
throw new UnsupportedOperationException();
}
@Setter
private static class FlowCompiledState {
private CompiledState state = CompiledState.UNKNOWN;
}
private enum CompiledState {
FAILED(-1),
UNKNOWN(0),
SUCCESSFUL(1),
SKIPPED(2);
public int value;
CompiledState(int value) {
this.value = value;
}
}
}