blob: 517f1c7d96187d7d82f20dcdaa1245247dfbd549 [file] [log] [blame] [view]
---
title: Locality Awareness In LoadAwareShuffleGrouping
layout: documentation
documentation: true
---
# Locality Awareness In LoadAwareShuffleGrouping
### Motivation
Apache Storm has introduced locality awareness to LoadAwareShuffleGrouping based on Bang-Bang control theory.
It aims to keep traffic to closer downstream executors to avoid network latency when those executors are not under heavy load.
It can also avoid serialization/deserialization overhead if the traffic happens in the same worker.
### How it works
An executor (say `E`) which has LoadAwareShuffleGrouping to downstream executors views them in four `scopes` based on their locations relative to the executor `E` itself.
The four scopes are:
* `WORKER_LOCAL`: every downstream executor located on the same worker as this executor `E`
* `HOST_LOCAL`: every downstream executor located on the same host as this executor `E`
* `RACK_LOCAL`: every downstream executor located on the same rack as this executor `E`
* `EVERYTHING`: every downstream executor of the executor `E`
It starts with sending tuples to the downstream executors in the scope of `WORKER_LOCAL`.
The downstream executors in the scope are chosen based on their load. Executors with lower load are more likely to be chosen.
Once the average load of these `WORKER_LOCAL` executors reaches `topology.localityaware.higher.bound`,
it switches to the higher scope which is `HOST_LOCAL` and starts sending tuples in that scope.
And if the average load is still higher than the `higher bound`, it switches to a higher scope.
On the other hand, it switches to a lower scope if the average load of the lower scope is less than `topology.localityaware.lower.bound`.
### How is Load calculated
The load of an downstream executor is the maximum of the following two:
* The population percentage of the receive queue
* Math.min(pendingMessages, 1024) / 1024.
`pendingMessages`: The upstream executor `E` sends messages to the downstream executor through Netty and the `pendingMessages` is the number of messages that haven't got through to the server.
If the downstream executor located on the same worker as the executor `E`, the load of that downstream executor is:
* The population percentage of the receive queue
### Relationship between Load and Capacity
The capacity of a bolt executor on Storm UI is calculated as:
* (number executed * average execute latency) / measurement time
It basically means how busy this executor is. If this is around 1.0, the corresponding Bolt is running as fast as it can. A `__capacity` metric exists to track this value for each executor.
The `Capacity` is not related to the `Load`:
* If the `Load` of the executor `E1` is high,
* the `Capacity` of `E1` could be high: population of the receive queue of `E1` could be high and it means the executor `E` has more work to do.
* the `Capacity` could also be low: `pendingMessage` could be high because other executors share the netty connection between the two workers and they are sending too many messages.
But the actual population of the receive queue of `E1` might be low.
* If the `Load` is low,
* the `Capacity` could be low: lower `Load` means less work to do.
* the `Capacity` could also be high: because the executor could be receiving tuples and executing tuples at the similar average rate.
* If the `Capacity` is high,
* the `Load` could be high: high `Capacity` means the executor is busy. It could be because it's receiving too many tuples.
* the `Load` could also be low: because the executor could be receiving tuples and executing tuples at the similar average rate.
* If the `Capacity` is low,
* the `Load` could be low: if the `pendingMessage` is low
* the `Load` could also be high: because the `pendingMessage` might be very high.
### Troubleshooting
#### I am seeing high capacity (close to 1.0) on some executors and low capacity (close to 0) on other executors
1. It could mean that you could reduce parallelism. Your executors are able to keep up and the load never gets to a very high point.
2. You can try to adjust `topology.localityaware.higher.bound` and `topology.localityaware.lower.bound`
3. You can try to enable `topology.ras.order.executors.by.proximity.needs`. With this config, unassigned executors will be sorted by topological order
with network proximity needs before being scheduled. This is a best-effort to split the topology to slices and allocate executors in each slice to as closest physical location as possible.
#### I just want the capacity on every downstream executor to be even
You can turn off LoadAwareShuffleGrouping by setting `topology.disable.loadaware.messaging` to `true`.