Apache Storm has introduced locality awareness to LoadAwareShuffleGrouping based on Bang-Bang control theory. It aims to keep traffic to closer downstream executors to avoid network latency when those executors are not under heavy load. It can also avoid serialization/deserialization overhead if the traffic happens in the same worker.
An executor (say E
) which has LoadAwareShuffleGrouping to downstream executors views them in four scopes
based on their locations relative to the executor E
itself. The four scopes are:
WORKER_LOCAL
: every downstream executor located on the same worker as this executor E
HOST_LOCAL
: every downstream executor located on the same host as this executor E
RACK_LOCAL
: every downstream executor located on the same rack as this executor E
EVERYTHING
: every downstream executor of the executor E
It starts with sending tuples to the downstream executors in the scope of WORKER_LOCAL
. The downstream executors in the scope are chosen based on their load. Executors with lower load are more likely to be chosen. Once the average load of these WORKER_LOCAL
executors reaches topology.localityaware.higher.bound
, it switches to the higher scope which is HOST_LOCAL
and starts sending tuples in that scope. And if the average load is still higher than the higher bound
, it switches to a higher scope.
On the other hand, it switches to a lower scope if the average load of the lower scope is less than topology.localityaware.lower.bound
.
The load of an downstream executor is the maximum of the following two:
pendingMessages
: The upstream executor E
sends messages to the downstream executor through Netty and the pendingMessages
is the number of messages that haven't got through to the server.
If the downstream executor located on the same worker as the executor E
, the load of that downstream executor is:
The capacity of a bolt executor on Storm UI is calculated as:
It basically means how busy this executor is. If this is around 1.0, the corresponding Bolt is running as fast as it can. A __capacity
metric exists to track this value for each executor.
The Capacity
is not related to the Load
:
Load
of the executor E1
is high,Capacity
of E1
could be high: population of the receive queue of E1
could be high and it means the executor E
has more work to do.Capacity
could also be low: pendingMessage
could be high because other executors share the netty connection between the two workers and they are sending too many messages. But the actual population of the receive queue of E1
might be low.Load
is low,Capacity
could be low: lower Load
means less work to do.Capacity
could also be high: because the executor could be receiving tuples and executing tuples at the similar average rate.Capacity
is high,Load
could be high: high Capacity
means the executor is busy. It could be because it's receiving too many tuples.Load
could also be low: because the executor could be receiving tuples and executing tuples at the similar average rate.Capacity
is low,Load
could be low: if the pendingMessage
is lowLoad
could also be high: because the pendingMessage
might be very high.It could mean that you could reduce parallelism. Your executors are able to keep up and the load never gets to a very high point.
You can try to adjust topology.localityaware.higher.bound
and topology.localityaware.lower.bound
You can try to enable topology.ras.order.executors.by.proximity.needs
. With this config, unassigned executors will be sorted by topological order with network proximity needs before being scheduled. This is a best-effort to split the topology to slices and allocate executors in each slice to as closest physical location as possible.
You can turn off LoadAwareShuffleGrouping by setting topology.disable.loadaware.messaging
to true
.