blob: cd2de788ba30d73af674ad50556ea2565f2dfa86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.service.modules.flow;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
import org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
/***
* Take in a logical {@link Spec} ie flow and compile corresponding materialized job {@link Spec}
* and its mapping to {@link SpecExecutor}.
*/
@Alpha
@Slf4j
public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
private AtomicReference<FlowGraph> flowGraph;
@Getter
private ServiceManager serviceManager;
@Getter
private CountDownLatch initComplete = new CountDownLatch(1);
private FlowGraphMonitor flowGraphMonitor;
private ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
private DataMovementAuthorizer dataMovementAuthorizer;
private Map<String, String> dataNodeAliasMap = new HashMap<>();
// a map to hold aliases of data nodes, e.g. gobblin.service.datanode.aliases.map=node1-dev:node1,node1-stg:node1,node1-prod:node1
public static final String DATA_NODE_ID_TO_ALIAS_MAP = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "datanode.aliases.map";
public MultiHopFlowCompiler(Config config) {
this(config, true);
}
public MultiHopFlowCompiler(Config config, boolean instrumentationEnabled) {
this(config, Optional.<Logger>absent(), instrumentationEnabled);
}
public MultiHopFlowCompiler(Config config, Optional<Logger> log) {
this(config, log, true);
}
public MultiHopFlowCompiler(Config config, AtomicReference<FlowGraph> flowGraph) {
super(config, Optional.absent(), true);
this.flowGraph = flowGraph;
this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
}
public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
try {
this.dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP)
? Splitter.on(",").withKeyValueSeparator(":").split(config.getString(DATA_NODE_ID_TO_ALIAS_MAP))
: new HashMap<>();
} catch (RuntimeException e) {
MultiHopFlowCompiler.log.warn("Exception reading data node alias map, ignoring it.", e);
}
// Use atomic reference to avoid partial flowgraph upgrades during path compilation.
this.flowGraph = new AtomicReference<>(new BaseFlowGraph(dataNodeAliasMap));
Optional<? extends UpdatableFSFlowTemplateCatalog> flowTemplateCatalog;
if (config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
&& StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) {
try {
String flowTemplateCatalogClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.TEMPLATE_CATALOGS_CLASS_KEY, ObservingFSFlowEdgeTemplateCatalog.class.getCanonicalName());
flowTemplateCatalog = Optional.of(
(UpdatableFSFlowTemplateCatalog) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(UpdatableFSFlowTemplateCatalog.class)
.resolve(flowTemplateCatalogClassName)), config, rwLock));
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
throw new RuntimeException("Cannot instantiate " + getClass().getName(), e);
}
} else {
return;
}
Config gitFlowGraphConfig = this.config;
if (this.config.hasPath(ConfigurationKeys.ENCRYPT_KEY_LOC)) {
//Add encrypt.key.loc config to the config passed to GitFlowGraphMonitor
gitFlowGraphConfig = this.config
.withValue(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.ENCRYPT_KEY_LOC, config.getValue(ConfigurationKeys.ENCRYPT_KEY_LOC));
}
try {
String dataMovementAuthorizerClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.DATA_MOVEMENT_AUTHORIZER_CLASS,
NoopDataMovementAuthorizer.class.getCanonicalName());
this.dataMovementAuthorizer = (DataMovementAuthorizer) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(DataMovementAuthorizer.class).resolve(dataMovementAuthorizerClassName)), this.config);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
try {
String flowGraphMonitorClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY, GitFlowGraphMonitor.class.getCanonicalName());
this.flowGraphMonitor = (FlowGraphMonitor) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(FlowGraphMonitor.class).resolve(
flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), instrumentationEnabled);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
this.serviceManager = (flowTemplateCatalog.isPresent() && flowTemplateCatalog.get() instanceof ObservingFSFlowEdgeTemplateCatalog) ?
new ServiceManager(Lists.newArrayList(this.flowGraphMonitor, flowTemplateCatalog.get())) : new ServiceManager(Lists.newArrayList(this.flowGraphMonitor));
addShutdownHook();
//Start the git flow graph monitor
try {
this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
} catch (TimeoutException te) {
MultiHopFlowCompiler.log.error("Timed out while waiting for the service manager to start up", te);
throw new RuntimeException(te);
}
}
/**
* Mark the {@link SpecCompiler} as active. This in turn activates the {@link GitFlowGraphMonitor}, allowing to start polling
* and processing changes
* @param active
*/
@Override
public void setActive(boolean active) {
super.setActive(active);
if (this.flowGraphMonitor != null) {
this.flowGraphMonitor.setActive(active);
}
}
@Override
public void awaitHealthy() throws InterruptedException {
if (this.getInitComplete().getCount() > 0) {
log.info("Waiting for the MultiHopFlowCompiler to become healthy..");
this.getInitComplete().await();
log.info("The MultihopFlowCompiler is healthy and ready to orchestrate flows.");
}
return;
}
/**
* @param spec an instance of {@link FlowSpec}.
* @return A DAG of {@link JobExecutionPlan}s, which encapsulates the compiled {@link org.apache.gobblin.runtime.api.JobSpec}s
* together with the {@link SpecExecutor} where the job can be executed; when compilation fails, return `null`, and also add a
* {@link org.apache.gobblin.runtime.api.FlowSpec.CompilationError} to `spec` (after casting to a {@link FlowSpec})
*/
@Override
public Dag<JobExecutionPlan> compileFlow(Spec spec) {
Preconditions.checkNotNull(spec);
Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowCompiler only accepts FlowSpecs");
FlowGraph graph = this.flowGraph.get();
long startTime = System.nanoTime();
FlowSpec flowSpec = (FlowSpec) spec;
String source = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, this.dataNodeAliasMap);
String destination = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
DataNode sourceNode = graph.getNode(source);
if (sourceNode == null) {
flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", source));
return null;
}
List<String> destNodeIds = FlowConfigUtils.getDataNodes(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
List<DataNode> destNodes = destNodeIds.stream().map(graph::getNode).collect(Collectors.toList());
if (destNodes.contains(null)) {
flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", destNodeIds.get(destNodes.indexOf(null))));
return null;
}
log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));
List<FlowSpec> flowSpecs = splitFlowSpec(flowSpec);
Dag<JobExecutionPlan> jobExecutionPlanDag = new Dag<>(new ArrayList<>());
try {
this.rwLock.readLock().lock();
for (FlowSpec datasetFlowSpec : flowSpecs) {
for (DataNode destNode : destNodes) {
long authStartTime = System.nanoTime();
try {
boolean authorized = this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, sourceNode, destNode);
Instrumented.updateTimer(dataAuthorizationTimer, System.nanoTime() - authStartTime, TimeUnit.NANOSECONDS);
if (!authorized) {
String message = String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s",
flowSpec.getUri().toString(), source, destination);
log.error(message);
datasetFlowSpec.addCompilationError(source, destination, message);
return null;
}
} catch (Exception e) {
Instrumented.markMeter(flowCompilationFailedMeter);
datasetFlowSpec.addCompilationError(source, destination, Throwables.getStackTraceAsString(e));
return null;
}
}
//Compute the path from source to destination.
FlowGraphPath flowGraphPath = graph.findPath(datasetFlowSpec);
if (flowGraphPath != null) {
//Convert the path into a Dag of JobExecutionPlans.
jobExecutionPlanDag = jobExecutionPlanDag.merge(flowGraphPath.asDag(this.config));
}
}
if (jobExecutionPlanDag.isEmpty()) {
Instrumented.markMeter(flowCompilationFailedMeter);
String message = String.format("No path found from source: %s and destination: %s", source, destination);
log.info(message);
if (!flowSpec.getCompilationErrors().stream().anyMatch(compilationError -> compilationError.errorPriority == 0)) {
flowSpec.addCompilationError(source, destination, message);
}
return null;
}
} catch (PathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | ReflectiveOperationException e) {
Instrumented.markMeter(flowCompilationFailedMeter);
String message = String.format("Exception encountered while compiling flow for source: %s and destination: %s, %s", source, destination, Throwables.getStackTraceAsString(e));
log.error(message, e);
flowSpec.addCompilationError(source, destination, message);
return null;
} finally {
this.rwLock.readLock().unlock();
}
Instrumented.markMeter(flowCompilationSuccessFulMeter);
Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
// Clear compilation errors now that compilation is successful
flowSpec.clearCompilationErrors();
return jobExecutionPlanDag;
}
public void setFlowGraph(FlowGraph flowGraph) {
this.flowGraph.set(flowGraph);
}
/**
* If {@link FlowSpec} has {@link ConfigurationKeys#DATASET_SUBPATHS_KEY}, split it into multiple flowSpecs using a
* provided base input and base output path to generate multiple source/destination paths.
*/
private static List<FlowSpec> splitFlowSpec(FlowSpec flowSpec) {
long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
List<FlowSpec> flowSpecs = new ArrayList<>();
Config flowConfig = flowSpec.getConfig();
if (flowConfig.hasPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)) {
List<String> datasetSubpaths = ConfigUtils.getStringList(flowConfig, ConfigurationKeys.DATASET_SUBPATHS_KEY);
String baseInputPath = ConfigUtils.getString(flowConfig, ConfigurationKeys.DATASET_BASE_INPUT_PATH_KEY, "/");
String baseOutputPath = ConfigUtils.getString(flowConfig, ConfigurationKeys.DATASET_BASE_OUTPUT_PATH_KEY, "/");
if (ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.DATASET_COMBINE_KEY, false)) {
Config newConfig = flowConfig.withoutPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX + "." + DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef(baseInputPath))
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX + "." + DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef(baseOutputPath))
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX + ".subPaths",
ConfigValueFactory.fromAnyRef(convertStringListToGlobPattern(datasetSubpaths)))
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX + ".subPaths",
ConfigValueFactory.fromAnyRef(convertStringListToGlobPattern(datasetSubpaths)));
flowSpecs.add(copyFlowSpecWithNewConfig(flowSpec, newConfig));
} else {
for (String subPath : datasetSubpaths) {
Config newConfig = flowConfig.withoutPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)
.withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId))
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX + "." + DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef(new Path(baseInputPath, subPath).toString()))
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX + "." + DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef(new Path(baseOutputPath, subPath).toString()));
flowSpecs.add(copyFlowSpecWithNewConfig(flowSpec, newConfig));
}
}
} else {
flowSpecs.add(flowSpec);
}
return flowSpecs;
}
/**
* Convert string list to string pattern that will work for globs.
*
* e.g. ["test1", "test2", test3"] -> "{test1,test2,test}"
*/
private static String convertStringListToGlobPattern(List<String> stringList) {
return "{" + Joiner.on(",").join(stringList) + "}";
}
private static FlowSpec copyFlowSpecWithNewConfig(FlowSpec flowSpec, Config newConfig) {
FlowSpec.Builder builder = FlowSpec.builder(flowSpec.getUri()).withVersion(flowSpec.getVersion())
.withDescription(flowSpec.getDescription()).withConfig(newConfig);
if (flowSpec.getTemplateURIs().isPresent()) {
builder = builder.withTemplates(flowSpec.getTemplateURIs().get());
}
if (flowSpec.getChildSpecs().isPresent()) {
builder = builder.withTemplates(flowSpec.getChildSpecs().get());
}
return builder.build();
}
/**
* Register a shutdown hook for this thread.
*/
private void addShutdownHook() {
ServiceManager manager = this.serviceManager;
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
// Give the services 5 seconds to stop to ensure that we are responsive to shutdown
// requests.
try {
manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
} catch (TimeoutException timeout) {
// stopping timed out
}
}
});
}
}