tree 10650bb353e9e5565b25d60c62d314d58e4217a9
parent b924acacb0c555370a593f3a069187cf8b8081d7
author Enrico Minack <github@enrico.minack.dev> 1708417190 +0100
committer GitHub <noreply@github.com> 1708417190 +0800
gpgsig -----BEGIN PGP SIGNATURE-----
 
 wsFcBAABCAAQBQJl1GCmCRC1aQ7uu5UhlAAAJiUQAEbOV9pT1l2D9JFBGF2LQmZ3
 PhjHCiQfQBW1q71YwPx1tRjcuzk0zsdlqgMgrwoz2a4/ns3BXBFdyJujN5D5q5vJ
 rDR9qBZL+Lbp/QEbqswBpa8+q0CYgH58glEreXDAWgbC9J0dZ6CKARAQ+aqEmPhL
 hM2otJIjmxn0r/iF/LIBTNrTKw38y+somia3gEm1mh6s53audTgsNNqKgl1cUeec
 /9Qr9y1pPNbyIp9J/KveoW/uawkV+fFwLoXJAk+TuZVOYNimzRaRejYOA30SnKA4
 UBxxfVsK0kNP+n7GDgiYIAI0RS/q5VvIS7cNaC0OkbnlvbXxJP5CZJKy2fk5/Aij
 d8wqmnOVunQ948bSnNUIfK0J9zytvqIvlMZvK2j36tM01pp4eIoH2mHkX6G/N7Hw
 ljaWq+g1CsWG5pIxzjCNgrcvtqSGEqHOuexGSKBjaRrfXOeahPw+YYI93GwgA5Fx
 7wVNNN6ruAtr+eSz0UgjxFckaLXuseQc4Prrv3KPdXXWxliU8DplBjjDHZVcDNop
 ipywlBP+AaoxZULPauvysjKhiLX0G7maIuBknR3uQqjzbmGJ8eq6Ht6DYNQs15ku
 ZmeFRQAa0/cWQxxvNUSyTeZ6HeFjYf2/iSv2tz+FDXoS72hslLZ4kB2YCojhjsYv
 eKxndaZr+c/+zghMXOAl
 =kc9C
 -----END PGP SIGNATURE-----
 

[#134] improvement(spark3): Use taskId and attemptNo as taskAttemptId (#1529)

### What changes were proposed in this pull request?
Use map index and task attempt number as the task attempt id in Spark3.

This requires to rework the bits of the blockId to maximize bit utilization for Spark3:
https://github.com/apache/incubator-uniffle/blob/b924acacb0c555370a593f3a069187cf8b8081d7/common/src/main/java/org/apache/uniffle/common/util/Constants.java#L30-L35

Ideally, the `TASK_ATTEMPT_ID_MAX_LENGTH` is set equal to `PARTITION_ID_MAX_LENGTH` + the number of bits required to store the largest task attempt number. The largest task attempt number is `maxFailures - 1`, or `maxFailures` if speculative execution is enabled (configured via `spark.speculation` and disabled by default). The `maxFailures` is configured via `spark.task.maxFailures` and defaults to 4. So by default, two bits are required to store the largest attempt number and `TASK_ATTEMPT_ID_MAX_LENGTH` should be set to `PARTITION_ID_MAX_LENGTH + 2`.

Example:

- with `PARTITION_ID_MAX_LENGTH = 20`, Uniffle supports 1,048,576 partitions
- requiring `TASK_ATTEMPT_ID_MAX_LENGTH = 22`
- allowing for `ATOMIC_INT_MAX_LENGTH = 21`.

### Why are the changes needed?
The map index (map partition id) is limited to the number of partitions of a shuffle. The task attempt number is limited by the max number of failures configured by `spark.task.maxFailures`, which defaults to 4. This provides us an id that is unique per shuffe while not growing arbitrarily large as `context.taskAttemptId` does.

Fix: #134

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit and integration tests.