blob: 9c3150adc8e185b171144eaf834cffaf8beb8a56 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.api;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.commons.util.profiledb.model.Experiment;
import org.apache.wayang.commons.util.profiledb.model.Subject;
import org.apache.wayang.core.monitor.Monitor;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
import org.apache.wayang.core.plan.executionplan.ExecutionPlan;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.plugin.Plugin;
import org.apache.wayang.core.profiling.CardinalityRepository;
import org.apache.wayang.core.util.ReflectionUtils;
/**
* This is the entry point for users to work with Wayang.
*/
public class WayangContext {
@SuppressWarnings("unused")
private final Logger logger = LogManager.getLogger(this.getClass());
/**
* Stores input/output cardinalities to provide better {@link CardinalityEstimator}s over time.
*/
private CardinalityRepository cardinalityRepository;
private final Configuration configuration;
public WayangContext() {
this(new Configuration());
}
public WayangContext(Configuration configuration) {
this.configuration = configuration.fork(String.format("WayangContext(%s)", configuration.getName()));
}
/**
* Registers the given {@link Plugin} with this instance.
*
* @param plugin the {@link Plugin} to register
* @return this instance
*/
public WayangContext with(Plugin plugin) {
this.register(plugin);
return this;
}
/**
* Registers the given {@link Plugin} with this instance.
*
* @param plugin the {@link Plugin} to register
* @return this instance
*/
public WayangContext withPlugin(Plugin plugin) {
this.register(plugin);
return this;
}
/**
* Registers the given {@link Plugin} with this instance.
*
* @param plugin the {@link Plugin} to register
* @see #with(Plugin)
*/
public void register(Plugin plugin) {
plugin.configure(this.getConfiguration());
}
/**
* Execute a plan.
*
* @param wayangPlan the plan to execute
* @param udfJars JARs that declare the code for the UDFs
* @see ReflectionUtils#getDeclaringJar(Class)
*/
public void execute(WayangPlan wayangPlan, String... udfJars) {
this.execute(null, wayangPlan, udfJars);
}
/**
* Execute a plan.
*
* @param jobName name of the {@link Job} or {@code null}
* @param wayangPlan the plan to execute
* @param udfJars JARs that declare the code for the UDFs
* @see ReflectionUtils#getDeclaringJar(Class)
*/
public void execute(String jobName, WayangPlan wayangPlan, String... udfJars) {
this.execute(jobName, null, wayangPlan, udfJars);
}
/**
* Execute a plan.
*
* @param jobName name of the {@link Job} or {@code null}
* @param wayangPlan the plan to execute
* @param udfJars JARs that declare the code for the UDFs
* @see ReflectionUtils#getDeclaringJar(Class)
*/
public void execute(String jobName, Monitor monitor, WayangPlan wayangPlan, String... udfJars) {
this.createJob(jobName, monitor, wayangPlan, udfJars).execute();
}
/**
* Execute a plan.
*
* @param jobName name of the {@link Job} or {@code null}
* @param wayangPlan the plan to execute
* @param experiment {@link Experiment} for that profiling entries will be created
* @param udfJars JARs that declare the code for the UDFs
* @see ReflectionUtils#getDeclaringJar(Class)
*/
public void execute(String jobName, WayangPlan wayangPlan, Experiment experiment, String... udfJars) {
this.createJob(jobName, wayangPlan, experiment, udfJars).execute();
}
/**
* Build an execution plan.
*
* @param wayangPlan the plan to translate
* @param udfJars JARs that declare the code for the UDFs
* @see ReflectionUtils#getDeclaringJar(Class)
*/
public ExecutionPlan buildInitialExecutionPlan(String jobName, WayangPlan wayangPlan, String... udfJars) {
return this.createJob(jobName, null, wayangPlan, udfJars).buildInitialExecutionPlan();
}
/**
* Create a new {@link Job} that should execute the given {@link WayangPlan} eventually.
*
* @param experiment {@link Experiment} for that profiling entries will be created
* @see ReflectionUtils#getDeclaringJar(Class)
*/
public Job createJob(String jobName, WayangPlan wayangPlan, Experiment experiment, String... udfJars) {
return new Job(this, jobName, null, wayangPlan, experiment, udfJars);
}
/**
* Create a new {@link Job} that should execute the given {@link WayangPlan} eventually.
*
* @see ReflectionUtils#getDeclaringJar(Class)
*/
public Job createJob(String jobName, WayangPlan wayangPlan, String... udfJars) {
return this.createJob(jobName, null, wayangPlan, udfJars);
}
/**
* Create a new {@link Job} that should execute the given {@link WayangPlan} eventually.
*
* @see ReflectionUtils#getDeclaringJar(Class)
*/
public Job createJob(String jobName, Monitor monitor, WayangPlan wayangPlan, String... udfJars) {
return new Job(this, jobName, monitor, wayangPlan, new Experiment("unknown", new Subject("unknown", "unknown")), udfJars);
}
public Configuration getConfiguration() {
return this.configuration;
}
public CardinalityRepository getCardinalityRepository() {
if (this.cardinalityRepository == null) {
this.cardinalityRepository = new CardinalityRepository(this.configuration);
}
return this.cardinalityRepository;
}
}