blob: e937443d5f577b307d70b797c3bef14ba62766d8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler;
import java.util.logging.Logger;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.proto.system.ExecutionEnvironment;
import org.apache.heron.proto.system.PackingPlans;
import org.apache.heron.scheduler.dryrun.SubmitDryRunResponse;
import org.apache.heron.scheduler.utils.LauncherUtils;
import org.apache.heron.scheduler.utils.Runtime;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.PackingPlanProtoSerializer;
import org.apache.heron.spi.scheduler.ILauncher;
import org.apache.heron.spi.scheduler.LauncherException;
import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
/**
* Runs Launcher and launch topology. Also Uploads launch state to state manager.
*/
public class LaunchRunner {
private static final Logger LOG = Logger.getLogger(LaunchRunner.class.getName());
private Config config;
private Config runtime;
private ILauncher launcher;
public LaunchRunner(Config config, Config runtime) {
this.config = config;
this.runtime = runtime;
this.launcher = Runtime.launcherClassInstance(runtime);
}
public ExecutionEnvironment.ExecutionState createExecutionState() {
String releaseUsername = Context.buildUser(config);
// TODO(mfu): Currently we leave release tag empty
String releaseTag = "";
String releaseVersion = Context.buildVersion(config);
TopologyAPI.Topology topology = Runtime.topology(runtime);
ExecutionEnvironment.ExecutionState.Builder builder =
ExecutionEnvironment.ExecutionState.newBuilder();
// set the topology name, id, submitting user and time
builder.setTopologyName(topology.getName()).
setTopologyId(topology.getId())
.setSubmissionTime(System.currentTimeMillis() / 1000)
.setSubmissionUser(Context.submitUser(config))
.setCluster(Context.cluster(config))
.setRole(Context.role(config))
.setEnviron(Context.environ(config));
// build the heron release state
ExecutionEnvironment.HeronReleaseState.Builder releaseBuilder =
ExecutionEnvironment.HeronReleaseState.newBuilder();
releaseBuilder.setReleaseUsername(releaseUsername);
releaseBuilder.setReleaseTag(releaseTag);
releaseBuilder.setReleaseVersion(releaseVersion);
builder.setReleaseState(releaseBuilder);
if (builder.isInitialized()) {
return builder.build();
} else {
throw new RuntimeException("Failed to create execution state");
}
}
/**
* Trim the topology definition for storing into state manager.
* This is because the user generated spouts and bolts
* might be huge.
*
* @return trimmed topology
*/
public TopologyAPI.Topology trimTopology(TopologyAPI.Topology topology) {
// create a copy of the topology physical plan
TopologyAPI.Topology.Builder builder = TopologyAPI.Topology.newBuilder().mergeFrom(topology);
// clear the state of user spout java objects - which can be potentially huge
for (TopologyAPI.Spout.Builder spout : builder.getSpoutsBuilderList()) {
spout.getCompBuilder().clearSerializedObject();
}
// clear the state of user spout java objects - which can be potentially huge
for (TopologyAPI.Bolt.Builder bolt : builder.getBoltsBuilderList()) {
bolt.getCompBuilder().clearSerializedObject();
}
return builder.build();
}
private PackingPlans.PackingPlan createPackingPlan(PackingPlan packingPlan) {
PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
return serializer.toProto(packingPlan);
}
/**
* Call launcher to launch topology
*
* @throws LauncherException
* @throws PackingException
* @throws SubmitDryRunResponse
*/
public void call() throws LauncherException, PackingException, SubmitDryRunResponse {
SchedulerStateManagerAdaptor statemgr = Runtime.schedulerStateManagerAdaptor(runtime);
TopologyAPI.Topology topology = Runtime.topology(runtime);
String topologyName = Context.topologyName(config);
PackingPlan packedPlan = LauncherUtils.getInstance().createPackingPlan(config, runtime);
if (Context.dryRun(config)) {
throw new SubmitDryRunResponse(topology, config, packedPlan);
}
// initialize the launcher
launcher.initialize(config, runtime);
// Set topology def first since we determine whether a topology is running
// by checking the existence of topology def
// store the trimmed topology definition into the state manager
// TODO(rli): log-and-false anti-pattern is too nested on this path. will not refactor
Boolean result = statemgr.setTopology(trimTopology(topology), topologyName);
if (result == null || !result) {
throw new LauncherException(String.format(
"Failed to set topology definition for topology '%s'", topologyName));
}
result = statemgr.setPackingPlan(createPackingPlan(packedPlan), topologyName);
if (result == null || !result) {
statemgr.deleteTopology(topologyName);
throw new LauncherException(String.format(
"Failed to set packing plan for topology '%s'", topologyName));
}
// store the execution state into the state manager
ExecutionEnvironment.ExecutionState executionState = createExecutionState();
result = statemgr.setExecutionState(executionState, topologyName);
if (result == null || !result) {
statemgr.deletePackingPlan(topologyName);
statemgr.deleteTopology(topologyName);
throw new LauncherException(String.format(
"Failed to set execution state for topology '%s'", topologyName));
}
// launch the topology, clear the state if it fails
if (!launcher.launch(packedPlan)) {
statemgr.deleteExecutionState(topologyName);
statemgr.deletePackingPlan(topologyName);
statemgr.deleteTopology(topologyName);
throw new LauncherException(String.format(
"Failed to launch topology '%s'", topologyName));
}
}
}