blob: 23e733455c77232509fe2994f3a16a6df243e834 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.app.environment.impl;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.*;
import backtype.storm.utils.NimbusClient;
import com.google.common.base.Preconditions;
import org.apache.eagle.app.Application;
import org.apache.eagle.app.Configuration;
import org.apache.eagle.app.environment.ExecutionRuntime;
import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
import org.apache.eagle.app.utils.DynamicJarPathFinder;
import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.thrift7.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Int;
import storm.trident.spout.RichSpoutBatchExecutor;
public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
private static LocalCluster _localCluster;
private StormEnvironment environment;
// static {
// Runtime.getRuntime().addShutdownHook(new Thread(){
// @Override
// public void run() {
// if(_localCluster != null) {
// LOG.info("Shutting down local storm cluster instance");
// _localCluster.shutdown();
// }
// }
// });
// }
private static LocalCluster getLocalCluster(){
if(_localCluster == null){
_localCluster = new LocalCluster();
}
return _localCluster;
}
@Override
public void prepare(StormEnvironment environment) {
this.environment = environment;
}
@Override
public StormEnvironment environment() {
return this.environment;
}
private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
public backtype.storm.Config getStormConfig(){
backtype.storm.Config conf = new backtype.storm.Config();
conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
} else {
LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
}
Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT;
if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
} else {
LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
}
conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
return conf;
}
@Override
public <Conf extends Configuration> void start(Application<Conf, StormEnvironment, StormTopology> executor, Conf config){
String topologyName = config.getAppId();
Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
StormTopology topology = executor.execute(config, environment);
LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
Config conf = getStormConfig();
if(config.getMode() == ApplicationEntity.Mode.CLUSTER){
if(config.getJarPath() == null) config.setJarPath(DynamicJarPathFinder.findPath(executor.getClass()));
String jarFile = config.getJarPath();
synchronized (StormExecutionRuntime.class) {
System.setProperty("storm.jar", jarFile);
LOG.info("Submitting as cluster mode ...");
try {
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
} catch (AlreadyAliveException | InvalidTopologyException e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(),e);
} finally {
System.clearProperty("storm.jar");
}
}
} else {
LOG.info("Submitting as local mode ...");
getLocalCluster().submitTopology(topologyName, conf, topology);
LOG.info("Submitted");
}
}
@Override
public <Conf extends Configuration> void stop(Application<Conf,StormEnvironment, StormTopology> executor, Conf config) {
String appId = config.getAppId();
if(config.getMode() == ApplicationEntity.Mode.CLUSTER){
Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
try {
stormClient.killTopology(appId);
} catch (NotAliveException | TException e) {
LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
}
} else {
KillOptions killOptions = new KillOptions();
killOptions.set_wait_secs(0);
getLocalCluster().killTopologyWithOpts(appId,killOptions);
}
}
@Override
public <Conf extends Configuration> void status(Application<Conf,StormEnvironment, StormTopology> executor, Conf config) {
// TODO: Not implemented yet!
throw new RuntimeException("TODO: Not implemented yet!");
}
public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> {
@Override
public StormExecutionRuntime get() {
return new StormExecutionRuntime();
}
}
}