| --- |
| title: Locality Awareness In LoadAwareShuffleGrouping |
| layout: documentation |
| documentation: true |
| --- |
| |
| # Locality Awareness In LoadAwareShuffleGrouping |
| |
| ### Motivation |
| |
| Apache Storm has introduced locality awareness to LoadAwareShuffleGrouping based on Bang-Bang control theory. |
| It aims to keep traffic to closer downstream executors to avoid network latency when those executors are not under heavy load. |
| It can also avoid serialization/deserialization overhead if the traffic happens in the same worker. |
| |
| ### How it works |
| |
| An executor (say `E`) which has LoadAwareShuffleGrouping to downstream executors views them in four `scopes` based on their locations relative to the executor `E` itself. |
| The four scopes are: |
| |
| * `WORKER_LOCAL`: every downstream executor located on the same worker as this executor `E` |
| * `HOST_LOCAL`: every downstream executor located on the same host as this executor `E` |
| * `RACK_LOCAL`: every downstream executor located on the same rack as this executor `E` |
| * `EVERYTHING`: every downstream executor of the executor `E` |
| |
| It starts with sending tuples to the downstream executors in the scope of `WORKER_LOCAL`. |
| The downstream executors in the scope are chosen based on their load. Executors with lower load are more likely to be chosen. |
| Once the average load of these `WORKER_LOCAL` executors reaches `topology.localityaware.higher.bound`, |
| it switches to the higher scope which is `HOST_LOCAL` and starts sending tuples in that scope. |
| And if the average load is still higher than the `higher bound`, it switches to a higher scope. |
| |
| On the other hand, it switches to a lower scope if the average load of the lower scope is less than `topology.localityaware.lower.bound`. |
| |
| |
| ### How is Load calculated |
| |
| The load of an downstream executor is the maximum of the following two: |
| |
| * The population percentage of the receive queue |
| * Math.min(pendingMessages, 1024) / 1024. |
| |
| `pendingMessages`: The upstream executor `E` sends messages to the downstream executor through Netty and the `pendingMessages` is the number of messages that haven't got through to the server. |
| |
| If the downstream executor located on the same worker as the executor `E`, the load of that downstream executor is: |
| * The population percentage of the receive queue |
| |
| ### Relationship between Load and Capacity |
| |
| The capacity of a bolt executor on Storm UI is calculated as: |
| * (number executed * average execute latency) / measurement time |
| |
| It basically means how busy this executor is. If this is around 1.0, the corresponding Bolt is running as fast as it can. A `__capacity` metric exists to track this value for each executor. |
| |
| The `Capacity` is not related to the `Load`: |
| |
| * If the `Load` of the executor `E1` is high, |
| * the `Capacity` of `E1` could be high: population of the receive queue of `E1` could be high and it means the executor `E` has more work to do. |
| * the `Capacity` could also be low: `pendingMessage` could be high because other executors share the netty connection between the two workers and they are sending too many messages. |
| But the actual population of the receive queue of `E1` might be low. |
| * If the `Load` is low, |
| * the `Capacity` could be low: lower `Load` means less work to do. |
| * the `Capacity` could also be high: because the executor could be receiving tuples and executing tuples at the similar average rate. |
| * If the `Capacity` is high, |
| * the `Load` could be high: high `Capacity` means the executor is busy. It could be because it's receiving too many tuples. |
| * the `Load` could also be low: because the executor could be receiving tuples and executing tuples at the similar average rate. |
| * If the `Capacity` is low, |
| * the `Load` could be low: if the `pendingMessage` is low |
| * the `Load` could also be high: because the `pendingMessage` might be very high. |
| |
| |
| ### Troubleshooting |
| |
| #### I am seeing high capacity (close to 1.0) on some executors and low capacity (close to 0) on other executors |
| |
| 1. It could mean that you could reduce parallelism. Your executors are able to keep up and the load never gets to a very high point. |
| |
| 2. You can try to adjust `topology.localityaware.higher.bound` and `topology.localityaware.lower.bound` |
| |
| 3. You can try to enable `topology.ras.order.executors.by.proximity.needs`. With this config, unassigned executors will be sorted by topological order |
| with network proximity needs before being scheduled. This is a best-effort to split the topology to slices and allocate executors in each slice to as closest physical location as possible. |
| |
| |
| #### I just want the capacity on every downstream executor to be even |
| |
| You can turn off LoadAwareShuffleGrouping by setting `topology.disable.loadaware.messaging` to `true`. |