blob: be12b192165b5ff3f3e33023bf5440b19f38e398 [file] [log] [blame] [view]
---
title: Multi-Lang Protocol
layout: documentation
documentation: true
---
This page explains the multilang protocol as of Storm 0.7.1. Versions prior to 0.7.1 used a somewhat different protocol, documented [here](Storm-multi-language-protocol-(versions-0.7.0-and-below\).html).
# Storm Multi-Language Protocol
## Supported Lanugages
Storm Multi-Language has implementation in the following languages:
- [JavaScript](https://github.com/apache/storm/tree/master/storm-multilang/javascript)
- [Python](https://github.com/apache/storm/tree/master/storm-multilang/python)
- [Ruby](https://github.com/apache/storm/tree/master/storm-multilang/ruby)
Third party libraries are available for the following languages:
- [c# (on .net core 2.0)](https://github.com/Azure/net-storm-multilang-adapter)
## Shell Components
Support for multiple languages is implemented via the ShellBolt,
ShellSpout, and ShellProcess classes. These classes implement the
IBolt and ISpout interfaces and the protocol for executing a script or
program via the shell using Java's ProcessBuilder class.
### Packaging of shell scripts
By default the ShellProcess assumes that your code is packaged inside of your topology jar under the resources subdirectory of your jar and by default will change the current working directory of
the executable process to be that resources directory extracted from the jar.
A jar file does not store permissions of the files in it. This includes the execute bit that would allow a shell script to be laoded and run by the operating systme.
As such in most examples the scripts are of the form `python mybolt.py` because the python executable is already on the supervisor and mybolt is packaged in the resources directory of the jar.
If you want to package something more complicated, like a new version of python itself, you need to instead use the blob store for this and a `.tgz` archive that does support permissions.
See the docs on the [Blob Store](distcache-blobstore.html) for more details on how to ship a jar.
To make a ShellBolt/ShellSpout work with executables + scripts shipped in the blob store dist cache add
```
changeChildCWD(false);
```
in the constructor of your ShellBolt/ShellSpout. The shell command will then be relative to the cwd of the worker. Where the sym-links to the resources are.
So if I shipped python with a symlink named `newPython` and a python ShellSpout I shipped into `shell_spout.py` I would have a something like
```
public MyShellSpout() {
super("./newPython/bin/python", "./shell_spout.py");
changeChildCWD(false);
}
```
## Output fields
Output fields are part of the Thrift definition of the topology. This means that when you multilang in Java, you need to create a bolt that extends ShellBolt, implements IRichBolt, and declare the fields in `declareOutputFields` (similarly for ShellSpout).
You can learn more about this on [Concepts](Concepts.html)
## Protocol Preamble
A simple protocol is implemented via the STDIN and STDOUT of the
executed script or program. All data exchanged with the process is
encoded in JSON, making support possible for pretty much any language.
# Packaging Your Stuff
To run a shell component on a cluster, the scripts that are shelled
out to must be in the `resources/` directory within the jar submitted
to the master.
However, during development or testing on a local machine, the resources
directory just needs to be on the classpath.
## The Protocol
Notes:
* Both ends of this protocol use a line-reading mechanism, so be sure to
trim off newlines from the input and to append them to your output.
* All JSON inputs and outputs are terminated by a single line containing "end". Note that this delimiter is not itself JSON encoded.
* The bullet points below are written from the perspective of the script writer's
STDIN and STDOUT.
### Initial Handshake
The initial handshake is the same for both types of shell components:
* STDIN: Setup info. This is a JSON object with the Storm configuration, a PID directory, and a topology context, like this:
```
{
"conf": {
"topology.message.timeout.secs": 3,
// etc
},
"pidDir": "...",
"context": {
"task->component": {
"1": "example-spout",
"2": "__acker",
"3": "example-bolt1",
"4": "example-bolt2"
},
"taskid": 3,
// Everything below this line is only available in Storm 0.10.0+
"componentid": "example-bolt"
"stream->target->grouping": {
"default": {
"example-bolt2": {
"type": "SHUFFLE"}}},
"streams": ["default"],
"stream->outputfields": {"default": ["word"]},
"source->stream->grouping": {
"example-spout": {
"default": {
"type": "FIELDS",
"fields": ["word"]
}
}
}
"source->stream->fields": {
"example-spout": {
"default": ["word"]
}
}
}
}
```
Your script should create an empty file named with its PID in this directory. e.g.
the PID is 1234, so an empty file named 1234 is created in the directory. This
file lets the supervisor know the PID so it can shutdown the process later on.
As of Storm 0.10.0, the context sent by Storm to shell components has been
enhanced substantially to include all aspects of the topology context available
to JVM components. One key addition is the ability to determine a shell
component's source and targets (i.e., inputs and outputs) in the topology via
the `stream->target->grouping` and `source->stream->grouping` dictionaries. At
the innermost level of these nested dictionaries, groupings are represented as
a dictionary that minimally has a `type` key, but can also have a `fields` key
to specify which fields are involved in a `FIELDS` grouping.
* STDOUT: Your PID, in a JSON object, like `{"pid": 1234}`. The shell component will log the PID to its log.
What happens next depends on the type of component:
### Spouts
Shell spouts are synchronous. The rest happens in a while(true) loop:
* STDIN: Either a next, ack, activate, deactivate or fail command.
"next" is the equivalent of ISpout's `nextTuple`. It looks like:
```
{"command": "next"}
```
"ack" looks like:
```
{"command": "ack", "id": "1231231"}
```
"activate" is the equivalent of ISpout's `activate`:
```
{"command": "activate"}
```
"deactivate" is the equivalent of ISpout's `deactivate`:
```
{"command": "deactivate"}
```
"fail" looks like:
```
{"command": "fail", "id": "1231231"}
```
* STDOUT: The results of your spout for the previous command. This can
be a sequence of emits and logs.
An emit looks like:
```
{
"command": "emit",
// The id for the tuple. Leave this out for an unreliable emit. The id can
// be a string or a number.
"id": "1231231",
// The id of the stream this tuple was emitted to. Leave this empty to emit to default stream.
"stream": "1",
// If doing an emit direct, indicate the task to send the tuple to
"task": 9,
// All the values in this tuple
"tuple": ["field1", 2, 3]
}
```
If not doing an emit direct, you will immediately receive the task ids to which the tuple was emitted on STDIN as a JSON array.
A "log" will log a message in the worker log. It looks like:
```
{
"command": "log",
// the message to log
"msg": "hello world!"
}
```
* STDOUT: a "sync" command ends the sequence of emits and logs. It looks like:
```
{"command": "sync"}
```
After you sync, ShellSpout will not read your output until it sends another next, ack, or fail command.
Note that, similarly to ISpout, all of the spouts in the worker will be locked up after a next, ack, or fail, until you sync. Also like ISpout, if you have no tuples to emit for a next, you should sleep for a small amount of time before syncing. ShellSpout will not automatically sleep for you.
### Bolts
The shell bolt protocol is asynchronous. You will receive tuples on STDIN as soon as they are available, and you may emit, ack, and fail, and log at any time by writing to STDOUT, as follows:
* STDIN: A tuple! This is a JSON encoded structure like this:
```
{
// The tuple's id - this is a string to support languages lacking 64-bit precision
"id": "-6955786537413359385",
// The id of the component that created this tuple
"comp": "1",
// The id of the stream this tuple was emitted to
"stream": "1",
// The id of the task that created this tuple
"task": 9,
// All the values in this tuple
"tuple": ["snow white and the seven dwarfs", "field2", 3]
}
```
* STDOUT: An ack, fail, emit, or log. Emits look like:
```
{
"command": "emit",
// The ids of the tuples this output tuples should be anchored to
"anchors": ["1231231", "-234234234"],
// The id of the stream this tuple was emitted to. Leave this empty to emit to default stream.
"stream": "1",
// If doing an emit direct, indicate the task to send the tuple to
"task": 9,
// All the values in this tuple
"tuple": ["field1", 2, 3]
}
```
If not doing an emit direct, you will receive the task ids to which
the tuple was emitted on STDIN as a JSON array. Note that, due to the
asynchronous nature of the shell bolt protocol, when you read after
emitting, you may not receive the task ids. You may instead read the
task ids for a previous emit or a new tuple to process. You will
receive the task id lists in the same order as their corresponding
emits, however.
An ack looks like:
```
{
"command": "ack",
// the id of the tuple to ack
"id": "123123"
}
```
A fail looks like:
```
{
"command": "fail",
// the id of the tuple to fail
"id": "123123"
}
```
A "log" will log a message in the worker log. It looks like:
```
{
"command": "log",
// the message to log
"msg": "hello world!"
}
```
* Note that, as of version 0.7.1, there is no longer any need for a
shell bolt to 'sync'.
### Handling Heartbeats (0.9.3 and later)
As of Storm 0.9.3, heartbeats have been between ShellSpout/ShellBolt and their
multi-lang subprocesses to detect hanging/zombie subprocesses. Any libraries
for interfacing with Storm via multi-lang must take the following actions
regarding hearbeats:
#### Spout
Shell spouts are synchronous, so subprocesses always send `sync` commands at the
end of `next()`, so you should not have to do much to support heartbeats for
spouts. That said, you must not let subprocesses sleep more than the worker
timeout during `next()`.
#### Bolt
Shell bolts are asynchronous, so a ShellBolt will send heartbeat tuples to its
subprocess periodically. Heartbeat tuple looks like:
```
{
"id": "-6955786537413359385",
"comp": "1",
"stream": "__heartbeat",
// this shell bolt's system task id
"task": -1,
"tuple": []
}
```
When subprocess receives heartbeat tuple, it must send a `sync` command back to
ShellBolt.