blob: ae656895272825bfd1c72c4ad126da040816034c [file] [log] [blame]
package backtype.storm;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.StormTopology;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;
import org.json.simple.JSONValue;
/**
* Use this class to submit topologies to run on the Storm cluster. You should run your program
* with the "storm jar" command from the command-line, and then use this class to
* submit your topologies.
*/
public class StormSubmitter {
public static Logger LOG = Logger.getLogger(StormSubmitter.class);
private static Nimbus.Iface localNimbus = null;
public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
StormSubmitter.localNimbus = localNimbusHandler;
}
/**
* Submits a topology to run on the cluster. A topology runs forever or until
* explicitly killed.
*
*
* @param name the name of the storm.
* @param stormConf the topology-specific configuration. See {@link Config}.
* @param topology the processing to execute.
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
*/
public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
if(!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
try {
String serConf = JSONValue.toJSONString(stormConf);
if(localNimbus!=null) {
LOG.info("Submitting topology " + name + " in local mode");
localNimbus.submitTopology(name, null, serConf, topology);
} else {
submitJar(conf);
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
client.getClient().submitTopology(name, submittedJar, serConf, topology);
} finally {
client.close();
}
}
LOG.info("Finished submitting topology: " + name);
} catch(TException e) {
throw new RuntimeException(e);
}
}
private static String submittedJar = null;
private static void submitJar(Map conf) {
if(submittedJar==null) {
LOG.info("Jar not uploaded to master yet. Submitting jar...");
String localJar = System.getProperty("storm.jar");
submittedJar = submitJar(conf, localJar);
} else {
LOG.info("Jar already uploaded to master. Not submitting jar.");
}
}
public static String submitJar(Map conf, String localJar) {
if(localJar==null) {
throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
}
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
String uploadLocation = client.getClient().beginFileUpload();
LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
BufferFileInputStream is = new BufferFileInputStream(localJar);
while(true) {
byte[] toSubmit = is.read();
if(toSubmit.length==0) break;
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
}
client.getClient().finishFileUpload(uploadLocation);
LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
return uploadLocation;
} catch(Exception e) {
throw new RuntimeException(e);
} finally {
client.close();
}
}
}