HADOOP-19654. Upgrade AWS SDK to 2.35.4 (#7882)



AWS SDK upgraded to 2.35.4.

This SDK has changed checksum/checksum headers handling significantly,
causing problems with third party stores, and, in some combinations
AWS S3 itself.

The S3A connector has retained old behavior; options to change
these settings are now available.

The default settings are chosen for maximum compatiblity and performance.

fs.s3a.request.md5.header:       true
fs.s3a.checksum.generation:      false
fs.s3a.create.checksum.algorithm: ""

Consult the documentation for more details.

Contributed by Steve Loughran
diff --git a/LICENSE-binary b/LICENSE-binary
index b61b7f3..11d5678 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -352,7 +352,7 @@
 io.swagger:swagger-annotations:1.5.4
 javax.inject:javax.inject:1
 net.java.dev.jna:jna:5.2.0
-net.minidev:accessors-smart:1.2
+net.minidev:accessors-smart:1.21
 org.apache.avro:avro:1.11.4
 org.apache.commons:commons-compress:1.26.1
 org.apache.commons:commons-configuration2:2.10.1
@@ -419,7 +419,7 @@
 org.yaml:snakeyaml:2.0
 org.wildfly.openssl:wildfly-openssl:2.2.5.Final
 ro.isdc.wro4j:wro4j-maven-plugin:1.8.0
-software.amazon.awssdk:bundle:2.29.52
+software.amazon.awssdk:bundle:2.35.4
 software.amazon.s3.analyticsaccelerator:analyticsaccelerator-s3:1.3.0
 net.jodah:failsafe:2.4.4
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java
index 59eb57a..0392c22 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.contract;
 
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -30,6 +31,7 @@
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readNBytes;
 
 /**
  * Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}.
@@ -145,10 +147,12 @@ protected void validateFileContents(FSDataInputStream stream, int length,
                                       int startIndex)
           throws IOException {
     byte[] streamData = new byte[length];
-    assertEquals(length, stream.read(streamData),
-        "failed to read expected number of bytes from "
-        + "stream. This may be transient");
+    final int read = readNBytes(stream, streamData, 0, length);
+    Assertions.assertThat(read)
+        .describedAs("failed to read expected number of bytes from stream. %s", stream)
+        .isEqualTo(length);
     byte[] validateFileBytes;
+
     if (startIndex == 0 && length == fileBytes.length) {
       validateFileBytes = fileBytes;
     } else {
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 9967f3d..7dbbd0b 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -209,7 +209,7 @@
     <make-maven-plugin.version>1.0-beta-1</make-maven-plugin.version>
     <surefire.fork.timeout>900</surefire.fork.timeout>
     <aws-java-sdk.version>1.12.720</aws-java-sdk.version>
-    <aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version>
+    <aws-java-sdk-v2.version>2.35.4</aws-java-sdk-v2.version>
     <amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
     <amazon-s3-analyticsaccelerator-s3.version>1.3.0</amazon-s3-analyticsaccelerator-s3.version>
     <aws.eventstream.version>1.0.1</aws.eventstream.version>
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
index b61667d..af187e3 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
@@ -54,4 +54,8 @@ public String getMessage() {
   public boolean retryable() {
     return getCause().retryable();
   }
+
+  public String getOperation() {
+    return operation;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java
index b856271..49ebd3a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java
@@ -24,6 +24,12 @@
  * Status code 443, no response from server. This is considered idempotent.
  */
 public class AWSNoResponseException extends AWSServiceIOException {
+
+  /**
+   * Constructor.
+   * @param operation operation in progress.
+   * @param cause inner cause
+   */
   public AWSNoResponseException(String operation,
       AwsServiceException cause) {
     super(operation, cause);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 3618065..75db900 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.s3a.impl.ChecksumSupport;
 import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 
@@ -1837,15 +1838,53 @@ private Constants() {
   public static final boolean CHECKSUM_VALIDATION_DEFAULT = false;
 
   /**
+   * Should checksums always be generated?
+   * Not all third-party stores like this being enabled for every request.
+   * Value: {@value}.
+   */
+  public static final String CHECKSUM_GENERATION =
+      "fs.s3a.checksum.generation";
+
+  /**
+   * Default value of {@link #CHECKSUM_GENERATION}.
+   * Value: {@value}.
+   */
+  public static final boolean DEFAULT_CHECKSUM_GENERATION = false;
+
+  /**
    * Indicates the algorithm used to create the checksum for the object
    * to be uploaded to S3. Unset by default. It supports the following values:
-   * 'CRC32', 'CRC32C', 'SHA1', and 'SHA256'
+   * 'CRC32', 'CRC32C', 'SHA1', 'SHA256', 'CRC64_NVME 'NONE', ''.
+   * When checksum calculation is enabled this MUST be set to a valid algorithm.
    * value:{@value}
    */
   public static final String CHECKSUM_ALGORITHM =
       "fs.s3a.create.checksum.algorithm";
 
   /**
+   * Default checksum algorithm: {@code "NONE"}.
+   */
+  public static final String DEFAULT_CHECKSUM_ALGORITHM =
+      ChecksumSupport.NONE;
+
+  /**
+   * Send a {@code Content-MD5 header} with every request.
+   * This is required when performing some operations with third party stores
+   * For example: bulk delete).
+   * It is supported by AWS S3, though has unexpected behavior with AWS S3 Express storage.
+   * See https://github.com/aws/aws-sdk-java-v2/issues/6459  for details.
+   */
+  public static final String REQUEST_MD5_HEADER =
+      "fs.s3a.request.md5.header";
+
+  /**
+   * Default value of {@link #REQUEST_MD5_HEADER}.
+   * Value: {@value}.
+   */
+  public static final boolean DEFAULT_REQUEST_MD5_HEADER = true;
+
+
+  /**
    * Are extensions classes, such as {@code fs.s3a.aws.credentials.provider},
    * going to be loaded from the same classloader that loaded
    * the {@link S3AFileSystem}?
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 7b5aa5f..41e904e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -30,6 +30,8 @@
 import org.slf4j.LoggerFactory;
 
 import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
+import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
+import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
 import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
@@ -41,6 +43,7 @@
 import software.amazon.awssdk.metrics.LoggingMetricPublisher;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.s3accessgrants.plugin.S3AccessGrantsPlugin;
+import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
 import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
@@ -202,11 +205,34 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
 
     configureEndpointAndRegion(builder, parameters, conf);
 
+    // add a plugin to add a Content-MD5 header.
+    // this is required when performing some operations with third party stores
+    // (for example: bulk delete), and is somewhat harmless when working with AWS S3.
+    if (parameters.isMd5HeaderEnabled()) {
+      LOG.debug("MD5 header enabled");
+      builder.addPlugin(LegacyMd5Plugin.create());
+    }
+
+    //when to calculate request checksums.
+    final RequestChecksumCalculation checksumCalculation =
+        parameters.isChecksumCalculationEnabled()
+            ? RequestChecksumCalculation.WHEN_SUPPORTED
+            : RequestChecksumCalculation.WHEN_REQUIRED;
+    LOG.debug("Using checksum calculation policy: {}", checksumCalculation);
+    builder.requestChecksumCalculation(checksumCalculation);
+
+    // response checksum validation. Slow, even with CRC32 checksums.
+    final ResponseChecksumValidation checksumValidation;
+    checksumValidation = parameters.isChecksumValidationEnabled()
+        ? ResponseChecksumValidation.WHEN_SUPPORTED
+        : ResponseChecksumValidation.WHEN_REQUIRED;
+    LOG.debug("Using checksum validation policy: {}", checksumValidation);
+    builder.responseChecksumValidation(checksumValidation);
+
     maybeApplyS3AccessGrantsConfigurations(builder, conf);
 
     S3Configuration serviceConfiguration = S3Configuration.builder()
         .pathStyleAccessEnabled(parameters.isPathStyleAccess())
-        .checksumValidationEnabled(parameters.isChecksumValidationEnabled())
         .build();
 
     final ClientOverrideConfiguration.Builder override =
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 079b402..863a63f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1173,10 +1173,15 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I
         .withTransferManagerExecutor(unboundedThreadPool)
         .withRegion(configuredRegion)
         .withFipsEnabled(fipsEnabled)
+        .withS3ExpressStore(s3ExpressStore)
         .withExpressCreateSession(
             conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT))
         .withChecksumValidationEnabled(
             conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT))
+        .withChecksumCalculationEnabled(
+            conf.getBoolean(CHECKSUM_GENERATION, DEFAULT_CHECKSUM_GENERATION))
+        .withMd5HeaderEnabled(conf.getBoolean(REQUEST_MD5_HEADER,
+            DEFAULT_REQUEST_MD5_HEADER))
         .withClientSideEncryptionEnabled(isCSEEnabled)
         .withClientSideEncryptionMaterials(cseMaterials)
         .withAnalyticsAcceleratorEnabled(isAnalyticsAcceleratorEnabled)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 63ad42d..af47081 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.core.exception.AbortedException;
 import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
@@ -240,8 +241,13 @@ public static IOException translateException(@Nullable String operation,
           ? (S3Exception) ase
           : null;
       int status = ase.statusCode();
-      if (ase.awsErrorDetails() != null) {
-        message = message + ":" + ase.awsErrorDetails().errorCode();
+      // error details, may be null
+      final AwsErrorDetails errorDetails = ase.awsErrorDetails();
+      // error code, will be null if errorDetails is null
+      String errorCode = "";
+      if (errorDetails != null) {
+        errorCode = errorDetails.errorCode();
+        message = message + ":" + errorCode;
       }
 
       // big switch on the HTTP status code.
@@ -308,6 +314,8 @@ public static IOException translateException(@Nullable String operation,
       // precondition failure: the object is there, but the precondition
       // (e.g. etag) didn't match. Assume remote file change during
       // rename or status passed in to openfile had an etag which didn't match.
+      // See the SC_200 handler for the treatment of the S3 Express failure
+      // variant.
       case SC_412_PRECONDITION_FAILED:
         ioe = new RemoteFileChangedException(path, message, "", ase);
         break;
@@ -352,6 +360,16 @@ public static IOException translateException(@Nullable String operation,
           return ((MultiObjectDeleteException) exception)
               .translateException(message);
         }
+        if (PRECONDITION_FAILED.equals(errorCode)) {
+          // S3 Express stores report conflict in conditional writes
+          // as a 200 + an error code of "PreconditionFailed".
+          // This is mapped to RemoteFileChangedException for consistency
+          // with SC_412_PRECONDITION_FAILED handling.
+          return new RemoteFileChangedException(path,
+              operation,
+              exception.getMessage(),
+              exception);
+        }
         // other 200: FALL THROUGH
 
       default:
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 559cd49..58d3813 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -188,6 +188,11 @@ final class S3ClientCreationParameters {
     private String region;
 
     /**
+     * Is this an S3 Express store?
+     */
+    private boolean s3ExpressStore;
+
+    /**
      * Enable S3Express create session.
      */
     private boolean expressCreateSession = S3EXPRESS_CREATE_SESSION_DEFAULT;
@@ -208,6 +213,17 @@ final class S3ClientCreationParameters {
     private boolean isAnalyticsAcceleratorEnabled;
 
     /**
+     * Is the MD5 Header Enabled?
+     */
+    private boolean md5HeaderEnabled;
+
+    /**
+     * Is Checksum calculation Enabled?
+     */
+    private boolean checksumCalculationEnabled;
+
+
+    /**
      * List of execution interceptors to include in the chain
      * of interceptors in the SDK.
      * @return the interceptors list
@@ -255,10 +271,18 @@ public S3ClientCreationParameters withRequesterPays(
       return this;
     }
 
+    /**
+     * Is this a requester pays bucket?
+     * @return true if the bucket is requester pays.
+     */
     public boolean isRequesterPays() {
       return requesterPays;
     }
 
+    /**
+     * Get the credentials.
+     * @return the credential provider.
+     */
     public AwsCredentialsProvider getCredentialSet() {
       return credentialSet;
     }
@@ -275,6 +299,10 @@ public S3ClientCreationParameters withCredentialSet(
       return this;
     }
 
+    /**
+     * Get UA suffix.
+     * @return suffix.
+     */
     public String getUserAgentSuffix() {
       return userAgentSuffix;
     }
@@ -536,6 +564,20 @@ public String getKmsRegion() {
       return kmsRegion;
     }
 
+    public boolean isS3ExpressStore() {
+      return s3ExpressStore;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public S3ClientCreationParameters withS3ExpressStore(final boolean value) {
+      s3ExpressStore = value;
+      return this;
+    }
+
     /**
      * Should s3express createSession be called?
      * @return true if the client should enable createSession.
@@ -564,10 +606,46 @@ public S3ClientCreationParameters withChecksumValidationEnabled(final boolean va
       return this;
     }
 
+    /**
+     * Is checksum validation on every request enabled?
+     * @return true if validation is on every request.
+     */
     public boolean isChecksumValidationEnabled() {
       return checksumValidationEnabled;
     }
 
+    /**
+     * Should MD5 headers be added?
+     * @return true to always add an MD5 header.
+     */
+    public boolean isMd5HeaderEnabled() {
+      return md5HeaderEnabled;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public S3ClientCreationParameters withMd5HeaderEnabled(final boolean value) {
+      md5HeaderEnabled = value;
+      return this;
+    }
+
+    public boolean isChecksumCalculationEnabled() {
+      return checksumCalculationEnabled;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public S3ClientCreationParameters withChecksumCalculationEnabled(final boolean value) {
+      checksumCalculationEnabled = value;
+      return this;
+    }
+
     @Override
     public String toString() {
       return "S3ClientCreationParameters{" +
@@ -580,8 +658,10 @@ public String toString() {
           ", multiPartThreshold=" + multiPartThreshold +
           ", multipartCopy=" + multipartCopy +
           ", region='" + region + '\'' +
+          ", s3ExpressStore=" + s3ExpressStore +
           ", expressCreateSession=" + expressCreateSession +
           ", checksumValidationEnabled=" + checksumValidationEnabled +
+          ", md5HeaderEnabled=" + md5HeaderEnabled +
           '}';
     }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 364f780..c8a3864 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -313,7 +313,8 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload(
     }
     try (AuditSpan span = activateAuditSpan()) {
       CompleteMultipartUploadResponse uploadResult;
-      uploadResult = invoker.retry("Completing multipart upload", destKey,
+      uploadResult = invoker.retry("Completing multipart upload id " + uploadId,
+          destKey,
           true,
           retrying,
           () -> {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java
index b378602..e374a1a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java
@@ -104,6 +104,7 @@ public SdkHttpFullRequest sign(SdkHttpFullRequest request,
 
   /**
    * Parse the bucket name from the host.
+   * This does not work for path-style access; the hostname of the endpoint is returned.
    * @param host hostname
    * @return the parsed bucket name; if "kms" is KMS signing.
    */
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
index bd76d83..c2f9ef4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.fs.s3a.auth;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -199,11 +198,31 @@ private RolePolicies() {
   public static final String S3_RESTORE_OBJECT = "s3:RestoreObject";
 
   /**
-   * S3Express session permission; required unless sessions are disabled.
+   * Everything: {@value}.
+   */
+  public static final String EVERYTHING_ARN = "*";
+
+  /**
+   * All S3Express buckets: {@value}.
+   * S3Express adds another "domain" for permissions: S3 express ARNs and S3 Express operations,
+   * of which createSession is one key operation.
+   * See https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-express-security.html
+   * Note: this wildcard patten came from AWS Q; if it is wrong blame GenerativeAI.
+   */
+  public static final String S3EXPRESS_ALL_BUCKETS = "arn:aws:s3express:*:*:bucket/*--*--x-s3";
+
+  /**
+   * S3Express session permission; required unless sessions are disabled: {@value}.
+   * See https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateSession.html
    */
   public static final String S3EXPRESS_CREATE_SESSION_POLICY = "s3express:CreateSession";
 
   /**
+   * S3 Express All operations: {@value}.
+   */
+  public static final String S3EXPRESS_ALL_OPERATIONS = "s3express:*";
+
+  /**
    * Actions needed to read a file in S3 through S3A, excluding
    * SSE-KMS.
    */
@@ -224,7 +243,7 @@ private RolePolicies() {
    */
   private static final String[] S3_ROOT_READ_OPERATIONS =
       new String[]{
-          S3_ALL_GET
+          S3_ALL_GET,
       };
 
   public static final List<String> S3_ROOT_READ_OPERATIONS_LIST =
@@ -239,7 +258,7 @@ private RolePolicies() {
   public static final String[] S3_BUCKET_READ_OPERATIONS =
       new String[]{
           S3_ALL_GET,
-          S3_BUCKET_ALL_LIST
+          S3_BUCKET_ALL_LIST,
       };
 
   /**
@@ -281,7 +300,7 @@ private RolePolicies() {
           S3_PUT_OBJECT,
           S3_PUT_OBJECT_ACL,
           S3_DELETE_OBJECT,
-          S3_ABORT_MULTIPART_UPLOAD
+          S3_ABORT_MULTIPART_UPLOAD,
       }));
 
   /**
@@ -293,6 +312,13 @@ private RolePolicies() {
       S3_ALL_OPERATIONS);
 
   /**
+   * S3 Express operations required for operation.
+   */
+  public static final Statement STATEMENT_S3EXPRESS = statement(true,
+      S3EXPRESS_ALL_BUCKETS,
+      S3EXPRESS_ALL_OPERATIONS);
+
+  /**
    * The s3:GetBucketLocation permission is for all buckets, not for
    * any named bucket, which complicates permissions.
    */
@@ -310,8 +336,9 @@ private RolePolicies() {
   public static List<Statement> allowS3Operations(String bucket,
       boolean write) {
     // add the bucket operations for the specific bucket ARN
-    ArrayList<Statement> statements =
+    List<Statement> statements =
         Lists.newArrayList(
+            STATEMENT_S3EXPRESS,
             statement(true,
                 bucketToArn(bucket),
                 S3_GET_BUCKET_LOCATION, S3_BUCKET_ALL_LIST));
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
index ba1dd40..9ada0d5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
@@ -64,7 +64,7 @@ public MagicCommitIntegration(S3AFileSystem owner,
       boolean magicCommitEnabled) {
     super(owner.createStoreContext());
     this.owner = owner;
-    this.magicCommitEnabled = magicCommitEnabled;
+    this.magicCommitEnabled = magicCommitEnabled && owner.isMultipartUploadEnabled();
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java
index b14f5f7..474d68c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java
@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.fs.s3a.impl;
 
+import java.util.Locale;
 import java.util.Set;
 
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ConfigurationHelper;
 
@@ -34,6 +34,22 @@
  */
 public final class ChecksumSupport {
 
+  /**
+   * Special checksum algorithm to declare that no checksum
+   * is required: {@value}.
+   */
+  public static final String NONE = "NONE";
+
+  /**
+   * CRC32C, mapped to CRC32_C algorithm class.
+   */
+  public static final String CRC32C = "CRC32C";
+
+  /**
+   * CRC64NVME, mapped to CRC64_NVME algorithm class.
+   */
+  public static final String CRC64NVME = "CRC64NVME";
+
   private ChecksumSupport() {
   }
 
@@ -43,6 +59,7 @@ private ChecksumSupport() {
   private static final Set<ChecksumAlgorithm> SUPPORTED_CHECKSUM_ALGORITHMS = ImmutableSet.of(
       ChecksumAlgorithm.CRC32,
       ChecksumAlgorithm.CRC32_C,
+      ChecksumAlgorithm.CRC64_NVME,
       ChecksumAlgorithm.SHA1,
       ChecksumAlgorithm.SHA256);
 
@@ -58,14 +75,22 @@ public static ChecksumAlgorithm getChecksumAlgorithm(Configuration conf) {
         CHECKSUM_ALGORITHM,
         ChecksumAlgorithm.class,
         configValue -> {
-          if (StringUtils.isBlank(configValue)) {
+          // default values and handling algorithms names without underscores.
+          String val = configValue == null
+              ? NONE
+              : configValue.toUpperCase(Locale.ROOT);
+          switch (val) {
+          case "":
+          case NONE:
             return null;
-          }
-          if (ChecksumAlgorithm.CRC32_C.toString().equalsIgnoreCase(configValue)) {
-            // In case the configuration value is CRC32C, without underscore.
+          case CRC32C:
             return ChecksumAlgorithm.CRC32_C;
+          case CRC64NVME:
+            return ChecksumAlgorithm.CRC64_NVME;
+          default:
+            throw new IllegalArgumentException("Checksum algorithm is not supported: "
+                + configValue);
           }
-          throw new IllegalArgumentException("Checksum algorithm is not supported: " + configValue);
         });
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index 8cf435f..d844865 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -313,4 +313,9 @@ private InternalConstants() {
   public static final String UPLOAD_PROGRESS_LOG_NAME =
       "org.apache.hadoop.fs.s3a.S3AFileSystem.Progress";
 
+  /**
+   * AWS Error code for conditional put failure on s3 express buckets: {@value}.
+   */
+  public static final String PRECONDITION_FAILED = "PreconditionFailed";
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
index 783a8a9..3645a65 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
@@ -144,6 +144,9 @@
 Please note that some endpoint and region settings that require cross region access
 are complex and improving over time. Hence, they may be considered unstable.
 
+*Important:* do not use `auto`, `ec2`, or `sdk` as these may be used
+in the future for specific region-binding algorithms.
+
 If you are working with third party stores, please check [third party stores in detail](third_party_stores.html).
 
 ### <a name="timeouts"></a> Network timeouts
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 1f65cae..7b77440 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -931,7 +931,9 @@
   <description>
     Indicates the algorithm used to create the checksum for the object
     to be uploaded to S3. Unset by default. It supports the following values:
-    'CRC32', 'CRC32C', 'SHA1', and 'SHA256'
+    'CRC32', 'CRC32C', 'SHA1', 'SHA256', "CRC64_NVME", "none"
+    The CRC64_NVME option requires aws-crt on the classpath, and is still
+    tangibly slower than CRC32C, which has its own instruction on x86 and ARM.
   </description>
 </property>
 
@@ -1433,6 +1435,9 @@
 1.  Uploads blocks in parallel in background threads.
 1.  Begins uploading blocks as soon as the buffered data exceeds this partition
     size.
+1.  Uses any checksum set in `fs.s3a.create.checksum.algorithm` to calculate an upload
+    checksum on data written; this is included in the file/part upload and verified
+    on the store. This can be a source of third-party store compatibility issues.
 1.  When buffering data to disk, uses the directory/directories listed in
     `fs.s3a.buffer.dir`. The size of data which can be buffered is limited
     to the available disk space.
@@ -1707,16 +1712,7 @@
 The best practise for using this option is to disable multipart purges in
 normal use of S3A, enabling only in manual/scheduled housekeeping operations.
 
-### S3A "fadvise" input policy support
-
-The S3A Filesystem client supports the notion of input policies, similar
-to that of the Posix `fadvise()` API call. This tunes the behavior of the S3A
-client to optimise HTTP GET requests for the different use cases.
-
-See [Improving data input performance through fadvise](./performance.html#fadvise)
-for the details.
-
-##<a name="metrics"></a>Metrics
+## <a name="metrics"></a>Metrics
 
 S3A metrics can be monitored through Hadoop's metrics2 framework. S3A creates
 its own metrics system called s3a-file-system, and each instance of the client
@@ -1754,7 +1750,126 @@
 Note that low-level metrics from the AWS SDK itself are not currently included
 in these metrics.
 
-##<a name="further_reading"></a> Other Topics
+
+## <a name="checksums"></a>Checksums
+
+The S3 Client can use checksums in its requests to an S3 store in a number of ways:
+
+1. To provide a checksum of the request headers.
+2. To provide a `Content-MD5` hash of the request headers
+3. To provide a checksum of data being PUT/POSTed to the store.
+4. To validate data downloaded from the store.
+
+The various options available can impact performance and compatibility.
+To understand the risks and issues here know that:
+* Request checksum generation (item 1) and validation (4) can be done "when required" or "always".
+  The "always" option is stricter, but can result in third-party compatibility issues.
+* Some third-party stores require the `Content-MD5` header and will fail without it (item 2)
+* Data upload checksums (item 3) can be computationally expensive and incompatible with third-party stores
+* The most efficient data upload checksum is CRC32C; there are explicit opcodes for this in x86 and ARM CPUs, with the appropriate implementation circuitry.
+* Data download validation checksums are also computationally expensive.
+
+| Option                             | Purpose                                        | Values  | Default  |
+|------------------------------------|------------------------------------------------|---------|----------|
+| `fs.s3a.request.md5.header`        | Enable MD5 header                              | boolean | `true`   |
+| `fs.s3a.checksum.generation`       | Generate checksums on all requests             | boolean | `false`  |
+| `fs.s3a.checksum.validation`       | Validate checksums on download                 | boolean | `false`  |
+| `fs.s3a.create.checksum.algorithm` | Checksum Algorithm when creating/copying files | `NONE`, `CRC32`, `CRC32C`, `CRC32_C`, `CRC64NVME` , `CRC64_NVME`, `SHA256`, `SHA1`   | `""` |
+
+
+Turning on checksum generation and validation may seem like obvious actions, but consider
+this: you are communicating with an S3 store over an HTTPS channels, which includes
+cryptographically strong HMAC checksums of every block transmitted.
+These are far more robust than the CRC* algorithms, and the computational cost is already
+being paid for: so why add more?
+
+With TLS ensuring the network traffic isn't altered from the moment it is encrypted to when
+it is decrypted, all extra checksum generation/validation does is ensure that there's no
+accidental corruption between the data being generated and uploaded, or between being downloaded and read.
+
+This could potentially deal with memory/buffering/bus issues on the servers.
+However this is what ECC RAM is for.
+If you do suspect requests being corrupted during writing or reading, the options may
+be worth considering.
+As it is, they are off by default to avoid compatibility problems.
+
+Note: if you have a real example of where these checksum options have identified memory corruption,
+please let us know.
+
+### Content-MD5 Header on requests: `fs.s3a.request.md5.header`
+
+Send a `Content-MD5 header` with every request?
+
+This header is required when interacting with some third-party stores.
+It is supported by AWS S3, though has has some unexpected behavior with AWS S3 Express storage
+[issue 6459](https://github.com/aws/aws-sdk-java-v2/issues/6459).
+As that appears to have been fixed in the 2.35.4 SDK release, this option is enabled by default.
+
+### Request checksum generation: `fs.s3a.checksum.generation`
+
+Should checksums be generated for all requests made to the store?
+
+* Incompatible with some third-party stores.
+* If `true` then multipart upload (i.e. large file upload) may fail if `fs.s3a.create.checksum.algorithm`
+  is not set to a valid algorithm (i.e. something other than `NONE`).
+
+Set `fs.s3a.checksum.generation` to `false` (the default value) to avoid these problems.
+
+### Checksum validation `fs.s3a.checksum.validation`
+
+Should the checksums of downloaded data be validated?
+
+This hurts performance and should be only used if considered important.
+
+### Creation checksum `fs.s3a.create.checksum.algorithm`
+
+This is the algorithm to use when checksumming data during file creation and copy.
+
+Options: `NONE`, `CRC32`, `CRC32C`, `CRC32_C`, `CRC64NVME` , `CRC64_NVME`, `SHA256`, `SHA1`
+
+The option `NONE` is new to Hadoop 3.4.3; previously an empty string was required for the same behavior.
+
+The `CRC64NVME`/`CRC64_NVME` option is also new to Hadoop 3.4.3 and requires the `aws-crt` module to be on the classpath, otherwise an error is printed:
+
+```
+java.lang.RuntimeException: Could not load software.amazon.awssdk.crt.checksums.CRC64NVME.
+Add dependency on 'software.amazon.awssdk.crt:aws-crt' module to enable CRC64NVME feature.
+```
+
+Checksum/algorithm incompatibilities may surface as a failure in "Completing multipart upload"`.
+
+First as a failure reported as a "missing part".
+```
+org.apache.hadoop.fs.s3a.AWSBadRequestException: Completing multipart upload id l8itQB.
+5u7TcWqznqbGfTjHv06mxb4IlBNcZiDWrBAS0t1EMJGkr9J1QD2UAwDM5rLUZqypJfWCoPJtySxA3QK9QqKTBdKr3LXYjYJ_r9lRcGdzBRbnIJeI8tBr8yqtS on
+test/testCommitEmptyFile/empty-commit.txt:
+software.amazon.awssdk.services.s3.model.S3Exception: One or more of the specified parts could not be found.
+The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.
+(Service: S3, Status Code: 400, Request ID: AQ0J4B66H626Y3FH,
+Extended Request ID: g1zo25aQCZfqFh3vOzrzOBp9RjJEWmKImRcfWhvaeFHQ2hZo1xTm3GVMD03zN+d+cFB6oNeelNc=)
+(SDK Attempt Count: 1):InvalidPart: One or more of the specified parts could not be found.
+The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.
+(Service: S3, Status Code: 400, Request ID: AQ0J4B66H626Y3FH, Extended Request ID:
+g1zo25aQCZfqFh3vOzrzOBp9RjJEWmKImRcfWhvaeFHQ2hZo1xTm3GVMD03zN+d+cFB6oNeelNc=) (SDK Attempt Count: 1)
+```
+
+Alternatively, as the failure of a multipart upload when a checksum algorithm is set and the part ordering is not in sequence.
+
+```
+org.apache.hadoop.fs.s3a.AWSStatus500Exception:
+  Completing multipart upload id A8rf256dBVbDtIVLr40KaMGKw9DY.rhgNP5zmn1ap97YjPaIO2Ac3XXL_T.2HCtIrGUpx5bdOTgvVeZzVHuoWI4pKv_MeMMVqBHJGP7u_q4PR8AxWvSq0Lsv724HT1fQ
+   on test/testMultipartUploadReverseOrderNonContiguousPartNumbers:
+software.amazon.awssdk.services.s3.model.S3Exception: We encountered an internal error.
+Please try again.
+(Service: S3, Status Code: 500, Request ID: WTBY2FX76Q5F5YWB,
+Extended Request ID: eWQWk8V8rmVmKImWVCI2rHyFS3XQSPgIkjfAyzzZCgVgyeRqox8mO8qO4ODMB6IUY0+rYqqsnOX2zXiQcRzFlb9p3nSkEEc+T0CYurLaH28=)
+(SDK Attempt Count: 3)
+```
+
+This is only possible through the FileSystem multipart API; normal data writes including
+those through the magic committer will not encounter it,
+
+## <a name="other_topics"></a> Other Topics
 
 ### <a name="distcp"></a> Copying Data with distcp
 
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/reading.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/reading.md
index fa4572b..6614ad4 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/reading.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/reading.md
@@ -163,6 +163,15 @@
 faster to use the classic stream and its parallel reads.
 
 
+## S3A "fadvise" input policy support: `fs.s3a.experimental.input.fadvise`
+
+The S3A Filesystem client supports the notion of input policies, similar
+to that of the Posix `fadvise()` API call. This tunes the behavior of the S3A
+client to optimise HTTP GET requests for the different use cases.
+
+See [Improving data input performance through fadvise](./performance.html#fadvise)
+for the details.
+
 ## Developer Topics
 
 ### Stream IOStatistics
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
index 4176b20..b029162 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
@@ -585,6 +585,17 @@
     <name>test.fs.s3a.performance.enabled</name>
     <value>false</value>
   </property>
+
+  <!--
+   If the store reports errors when trying to list/abort completed multipart uploads,
+   expect failures in ITestUploadRecovery and ITestS3AContractMultipartUploader.
+   The tests can be reconfigured to expect failure.
+   Note how this can be set as a per-bucket option.
+  -->
+  <property>
+    <name>fs.s3a.ext.test.multipart.commit.consumes.upload.id</name>
+    <value>true</value>
+  </property>
 ```
 
 See [Third Party Stores](third_party_stores.html) for more on this topic.
@@ -736,9 +747,23 @@
   </property>
 ```
 
+### Changing expectations on multipart upload retries: `ITestS3AContractMultipartUploader` and `ITestUploadRecovery`
+
+If the store reports errors when trying to list/abort completed multipart uploads,
+expect failures in `ITestUploadRecovery` and `ITestS3AContractMultipartUploader`.
+The tests can be reconfigured to expect failure by setting the option
+`fs.s3a.ext.test.multipart.commit.consumes.upload.id` to true.
+
+Note how this can be set as a per-bucket option.
+
+```xml
+  <property>
+    <name>fs.s3a.ext.test.multipart.commit.consumes.upload.id</name>
+    <value>true</value>
+  </property>
+```
 ### Tests which may fail (and which you can ignore)
 
-* `ITestS3AContractMultipartUploader` tests `testMultipartUploadAbort` and `testSingleUpload` raising `FileNotFoundException`
 * `ITestS3AMiscOperations.testEmptyFileChecksums`: if the FS encrypts data always.
 
 ## <a name="debugging"></a> Debugging Test failures
@@ -837,10 +862,15 @@
 * `getFileSystem()` returns the S3A Filesystem bonded to the contract test Filesystem
 defined in `fs.s3a.contract.test`
 * will automatically skip all tests if that URL is unset.
-* Extends  `AbstractFSContractTestBase` and `Assert` for all their methods.
+* Extends  `AbstractFSContractTestBase`
+* Uses AssertJ for all assertions, _not_ those of JUnit5.
 
 Having shared base classes may help reduce future maintenance too. Please
-use them/
+use them.
+
+We adopted AssertJ assertions long before the move to JUnit5.
+While there are still many tests with legacy JUnit 1.x assertions, all new test cases
+should use AssertJ assertions and MUST NOT use JUnit5.
 
 ### Secure
 
@@ -873,7 +903,7 @@
 URL can be overridden for testing elsewhere.
 
 
-### Works With Other S3 Stored
+### Works With Other S3 Stores
 
 Don't assume AWS S3 US-East only, do allow for working with external S3 implementations.
 Those may be behind the latest S3 API features, not support encryption, session
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md
index f6fea93..0336efa 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md
@@ -42,19 +42,9 @@
 * Bucket lifecycle rules to clean up pending uploads.
 * Support for multipart uploads.
 * Conditional file creation. (`fs.s3a.create.conditional.enabled = false`)
+* Variations in checksum calculation on uploads.
+* Requirement for Content-MD5 headers.
 
-### Disabling Change Detection
-
-The (default) etag-based change detection logic expects stores to provide an Etag header in HEAD/GET requests,
-and to support it as a precondition in subsequent GET and COPY calls.
-If a store does not do this, disable the checks.
-
-```xml
-<property>
-  <name>fs.s3a.change.detection.mode</name>
-  <value>none</value>
-</property>
-```
 ## Connecting to a third party object store over HTTPS
 
 The core setting for a third party store is to change the endpoint in `fs.s3a.endpoint`.
@@ -65,10 +55,11 @@
 
 The v4 signing algorithm requires a region to be set in `fs.s3a.endpoint.region`.
 A non-empty value is generally sufficient, though some deployments may require
-a specific value. 
+a specific value.
 
-*Important:* do not use `auto` or `sdk` as these may be used
-in the future for specific region binding algorithms.
+*Important:* do not use `auto`, `ec2`, or `sdk` as these may be used
+in the future for specific region binding algorithms; while `null`
+can be mis-interpreted.
 
 Finally, assuming the credential source is the normal access/secret key
 then these must be set, either in XML or (preferred) in a JCEKS file.
@@ -87,7 +78,7 @@
 
   <property>
     <name>fs.s3a.endpoint.region</name>
-    <value>anything</value>
+    <value>anything except: sdk, auto, ec2</value>
   </property>
 
   <property>
@@ -104,7 +95,14 @@
 
 If per-bucket settings are used here, then third-party stores and credentials may be used alongside an AWS store.
 
+### region naming
 
+AWS SDK requires the name of a region is supplied for signing, and that region match the endpoint used.
+
+Third-party stores don't normally care about the name of a region, *only that a region is supplied*.
+
+You should set `fs.s3a.endpoint.region` to anything except the following reserved names: `sdk`, `ec2` and `auto`.
+We have plans for those.
 
 ## Other issues
 
@@ -120,7 +118,7 @@
 
 #### S3Guard uploads command
 
-This can be executed on a schedule, or manually
+This can be executed on a schedule, or manually:
 
 ```
 hadoop s3guard uploads -abort -force s3a://bucket/
@@ -174,10 +172,78 @@
   </property>
 ```
 
+## Controlling Upload Checksums and MD5 Headers
+
+It may be necessary to change checksums of uploads by
+1. Enabling the attachment of a `Content-MD5 header` in requests
+2. Restricting checksum generation to only when required.
+
+```xml
+  <property>
+    <name>fs.s3a.request.md5.header</name>
+    <value>true</value>
+    <description>Enable calculation and inclusion of an MD5 HEADER on data upload operations</description>
+  </property>
+
+  <property>
+    <name>fs.s3a.checksum.generation</name>
+    <value>false</value>
+    <description>Calculate and attach a message checksum on every operation.</description>
+  </property>
+
+  <property>
+    <name>fs.s3a.checksum.validation</name>
+    <value>false</value>
+    <description>Validate data checksums on download</description>
+  </property>
+```
+
+These options are set for best compatibility and performance by default; they may need tuning for specific stores.
+
+See [checksums](index.html#checksums) for more details.
+
+### Disabling Change Detection
+
+The (default) etag-based change detection logic expects stores to provide an Etag header in HEAD/GET requests,
+and to support it as a precondition in subsequent GET and COPY calls.
+If a store does not do this, disable the checks.
+
+```xml
+<property>
+  <name>fs.s3a.change.detection.mode</name>
+  <value>none</value>
+</property>
+```
+
+## Handling Null Etags
+
+Some object stores do not support etags, that is: they return `null` or an empty string as the etag of an object on both HEAD and GET requests.
+
+This breaks version management in the classic input stream *and* metadata caching in the analytics stream.
+
+To work with such a store:
+* Set `fs.s3a.input.stream.type` to `classic`
+* Set `fs.s3a.change.detection.mode` to `none`
+
+```xml
+<property>
+  <name>fs.s3a.input.stream.type</name>
+  <value>classic</value>
+</property>
+
+<property>
+  <name>fs.s3a.change.detection.mode</name>
+  <value>none</value>
+</property>
+```
+
+Note: the [cloudstore](https://github.com/steveloughran/cloudstore) `etag` command will retrieve and print an object's etag,
+and can be used to help debug this situation.
+The etag value of a newly created object SHOULD be a non-empty string.
 
 # Troubleshooting
 
-The most common problem when talking to third-party stores are
+The most common problem when talking to third-party stores are:
 
 1. The S3A client is still configured to talk to the AWS S3 endpoint. This leads to authentication failures and/or reports that the bucket is unknown.
 2. Path access has not been enabled, the client is generating a host name for the target bucket and it does not exist.
@@ -185,11 +251,12 @@
 4. JVM HTTPS settings include the certificates needed to negotiate a TLS connection with the store.
 
 
-## How to improve troubleshooting
+## How to Troubleshoot problems
 
-### log more network info
+### Log More Network Info
 
-There are some very low level logs.
+There are some very low level logs which can be printed.
+
 ```properties
 # Log all HTTP requests made; includes S3 interaction. This may
 # include sensitive information such as account IDs in HTTP headers.
@@ -203,7 +270,7 @@
 log4j.logger.io.netty.handler.codec.http2.Http2FrameLogger=DEBUG
 ```
 
-### Cut back on retries, shorten timeouts
+### Reduce on Retries; Shorten Timeouts
 
 By default, there's a lot of retries going on in the AWS connector (which even retries on DNS failures)
 and in the S3A code which invokes it.
@@ -263,7 +330,7 @@
 There's an external utility, [cloudstore](https://github.com/steveloughran/cloudstore) whose [storediag](https://github.com/steveloughran/cloudstore#command-storediag) exists to debug the connection settings to hadoop cloud storage.
 
 ```bash
-hadoop jar cloudstore-1.0.jar storediag s3a://nonexistent-bucket-example/
+hadoop jar cloudstore-1.1.jar storediag s3a://nonexistent-bucket-example/
 ```
 
 The main reason it's not an ASF release is that it allows for a rapid release cycle, sometimes hours; if anyone doesn't trust
@@ -414,7 +481,43 @@
   </property>
 ```
 
-# Connecting to Google Cloud Storage through the S3A connector
+# Settings for Specific Stores
+
+## Dell ECS through the S3A Connector
+
+As of November 2025 and the 2.35.4 AWS SDK, the settings needed to interact with Dell ECS
+at [ECS Test Drive](https://portal.ecstestdrive.com/) were
+
+```xml
+<property>
+  <name>fs.s3a.endpoint.region</name>
+  <value>dell</value>
+  <description>arbitrary name other than sdk, ec2, auto or null</description>
+</property>
+
+<property>
+  <name>fs.s3a.path.style.access</name>
+  <value>true</value>
+</property>
+
+<property>
+  <name>fs.s3a.create.conditional.enabled</name>
+  <value>false</value>
+</property>
+
+<property>
+  <name>fs.s3a.bucket.request.md5.header</name>
+  <value>true</value>
+  <description>Enable calculation and inclusion of an MD5 HEADER on data upload operations</description>
+</property>
+
+<property>
+  <name>fs.s3a.checksum.generation</name>
+  <value>false</value>
+</property>
+```
+
+## Google Cloud Storage through the S3A connector
 
 It *is* possible to connect to google cloud storage through the S3A connector.
 However, Google provide their own [Cloud Storage connector](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage).
@@ -443,63 +546,68 @@
 <configuration>
 
   <property>
-    <name>fs.s3a.bucket.gcs-container.access.key</name>
+    <name>fs.s3a.access.key</name>
     <value>GOOG1EZ....</value>
   </property>
 
   <property>
-    <name>fs.s3a.bucket.gcs-container.secret.key</name>
+    <name>fs.s3a.secret.key</name>
     <value>SECRETS</value>
   </property>
 
   <property>
-    <name>fs.s3a.bucket.gcs-container.endpoint</name>
+    <name>fs.s3a.endpoint</name>
     <value>https://storage.googleapis.com</value>
   </property>
 
+
+  <!-- any value except sdk, auto and ec2 is allowed here, using "gcs" is more informative -->
   <property>
-    <name>fs.s3a.bucket.gcs-container.bucket.probe</name>
+    <name>fs.s3a.endpoint.region</name>
+    <value>gcs</value>
+  </property>
+
+  <property>
+    <name>fs.s3a.path.style.access</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.s3a.checksum.generation</name>
+    <value>false</value>
+    <description>Calculate and attach a message checksum on every operation. (default: true)</description>
+  </property>
+
+  <property>
+    <name>fs.s3a.bucket.probe</name>
     <value>0</value>
   </property>
 
   <property>
-    <name>fs.s3a.bucket.gcs-container.list.version</name>
+    <name>fs.s3a.list.version</name>
     <value>1</value>
   </property>
 
   <property>
-    <name>fs.s3a.bucket.gcs-container.multiobjectdelete.enable</name>
+    <name>fs.s3a.multiobjectdelete.enable</name>
     <value>false</value>
   </property>
 
   <property>
-    <name>fs.s3a.bucket.gcs-container.path.style.access</name>
-    <value>true</value>
-  </property>
-
-  <!-- any value is allowed here, using "gcs" is more informative -->
-  <property>
-    <name>fs.s3a.bucket.gcs-container.endpoint.region</name>
-    <value>gcs</value>
-  </property>
-
-  <!-- multipart uploads trigger 400 response-->
-  <property>
-    <name>fs.s3a.multipart.uploads.enabled</name>
+    <name>fs.s3a.committer.magic.enabled</name>
     <value>false</value>
   </property>
-  
+
    <property>
     <name>fs.s3a.optimized.copy.from.local.enabled</name>
     <value>false</value>
   </property>
-  
+
   <!-- No support for conditional file creation -->
   <property>
     <name>fs.s3a.create.conditional.enabled</name>
     <value>false</value>
   </property>
-  
 </configuration>
 ```
 
@@ -531,3 +639,4 @@
 
 _Note_ If anyone is set up to test this regularly, please let the hadoop developer team know if regressions do surface,
 as it is not a common test configuration.
+We do use it to help test compatibility during SDK updates.
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 151ee5b..d20e2d9 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -243,7 +243,7 @@
 the interface `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`.
 
 ```
-InstantiationIOException: `s3a://stevel-gcs/': Class org.apache.hadoop.fs.s3a.S3ARetryPolicy does not implement
+InstantiationIOException: `s3a://gcs/': Class org.apache.hadoop.fs.s3a.S3ARetryPolicy does not implement
  software.amazon.awssdk.auth.credentials.AwsCredentialsProvider (configuration key fs.s3a.aws.credentials.provider)
         at org.apache.hadoop.fs.s3a.impl.InstantiationIOException.isNotInstanceOf(InstantiationIOException.java:128)
         at org.apache.hadoop.fs.s3a.S3AUtils.getInstanceFromReflection(S3AUtils.java:604)
@@ -354,7 +354,7 @@
 
 This is an obscure failure which was encountered as part of
 [HADOOP-19221](https://issues.apache.org/jira/browse/HADOOP-19221) : an upload of part of a file could not
-be succesfully retried after a failure was reported on the first attempt.
+be successfully retried after a failure was reported on the first attempt.
 
 1. It was only encountered during uploading files via the Staging Committers
 2. And is a regression in the V2 SDK.
@@ -364,7 +364,7 @@
 * If it is encountered on a release without the fix, please upgrade.
 
 It may be that the problem arises in the AWS SDK's "TransferManager", which is used for a
-higher performance upload of data from the local fileystem. If this is the case. disable this feature:
+higher performance upload of data from the local filesystem. If this is the case. disable this feature:
 ```
 <property>
   <name>fs.s3a.optimized.copy.from.local.enabled</name>
@@ -409,6 +409,48 @@
 </property>
 ```
 
+### <a name="content-sha-256"></a> Status Code 400 "XAmzContentSHA256Mismatch: The Content-SHA256 you specified did not match what we receive"
+
+Seen when working with a third-party store
+
+```
+org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object  on test:
+software.amazon.awssdk.services.s3.model.S3Exception:
+The Content-SHA256 you specified did not match what we received
+(Service: S3, Status Code: 400, Request ID: 0c07c87d:196d43d824a:d7bca:eeb, Extended Request ID: 2af53adb49ffb141a32b534ad7ffbdf33a247f6b95b422011e0b109649d1fab7) (SDK Attempt Count: 1):
+XAmzContentSHA256Mismatch: The Content-SHA256 you specified did not match what we received
+```
+
+This happens when a file create checksum has been enabled but the store does not support it/support it consistently with AWS S3.
+
+```xml
+  <property>
+    <name>fs.s3a.create.checksum.algorithm</name>
+    <value>none</value>
+  </property>
+```
+
+### <a name="content-sha-256"></a> Status Code 400 "x-amz-sdk-checksum-algorithm specified, but no corresponding x-amz-checksum-* or x-amz-trailer headers were found"
+
+```
+org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object  on test
+software.amazon.awssdk.services.s3.model.InvalidRequestException
+x-amz-sdk-checksum-algorithm specified, but no corresponding x-amz-checksum-* or x-amz-trailer headers were found.
+  (Service: S3, Status Code: 400, Request ID: 012929bd17000198c8bc82d20509eecd6df79b1a, Extended Request ID: P9bq0Iv) (SDK Attempt Count: 1):
+```
+
+The checksum algorithm to be used is not one supported by the store.
+In particular, the value `unknown_to_sdk_version` appears to cause it.
+
+```xml
+  <property>
+    <name>fs.s3a.create.checksum.algorithm</name>
+    <value>unknown_to_sdk_version</value>
+  </property>
+```
+
+Fix: use a checksum the store knows about.
+
 ## <a name="access_denied"></a> Access Denied
 
 HTTP error codes 401 and 403 are mapped to `AccessDeniedException` in the S3A connector.
@@ -436,6 +478,9 @@
 
 ```
 
+If working with a third-party bucket, verify the `fs.s3a.endpoint` setting
+points to the third-party store.
+
 ###  <a name="access_denied_disabled"></a> `AccessDeniedException` All access to this object has been disabled
 
 Caller has no permission to access the bucket at all.
@@ -560,13 +605,80 @@
 
 If you want to access the file with S3A after writes, do not set `fs.s3a.create.storage.class` to `glacier` or `deep_archive`.
 
+### <a name="SignatureDoesNotMatch"></a>`AccessDeniedException` with `SignatureDoesNotMatch` on a third party bucket.
+
+This can surface when trying to interact, especially write data, to a third-party bucket
+
+```
+ Writing Object on example-file: software.amazon.awssdk.services.s3.model.S3Exception: Invalid argument. (Service: S3, Status Code: 403, Request ID: null) (SDK Attempt Count: 1):SignatureDoesNotMatch
+```
+
+The store does not recognize checksum calculation on every operation.
+Fix: disable it by setting `fs.s3a.checksum.generation` to `false`.
+
+```xml
+<property>
+  <name>fs.s3a.checksum.generation</name>
+  <value>false</value>
+  <description>Calculate and attach a message checksum on every operation. (default: false)</description>
+</property>
+```
+
+Full stack
+
+```
+> bin/hadoop fs -touchz s3a://gcs/example-file
+2025-10-21 16:23:27,642 [main] WARN  s3a.S3ABlockOutputStream (S3ABlockOutputStream.java:progressChanged(1335)) - Transfer failure of block FileBlock{index=1, destFile=/tmp/hadoop-stevel/s3a/s3ablock-0001-1358390699869033998.tmp, state=Upload, dataSize=0, limit=-1}
+2025-10-21 16:23:27,645 [main] DEBUG shell.Command (Command.java:displayError(481)) - touchz failure
+java.nio.file.AccessDeniedException: example-file: Writing Object on example-file: software.amazon.awssdk.services.s3.model.S3Exception: Invalid argument. (Service: S3, Status Code: 403, Request ID: null) (SDK Attempt Count: 1):SignatureDoesNotMatch
+   at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:271)
+   at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:124)
+   at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:376)
+   at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)
+   at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:372)
+   at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:347)
+   at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:210)
+   at org.apache.hadoop.fs.s3a.WriteOperationHelper.putObject(WriteOperationHelper.java:534)
+   at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.putObject(S3ABlockOutputStream.java:726)
+   at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:518)
+   at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77)
+   at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
+   at org.apache.hadoop.fs.shell.TouchCommands$Touchz.touchz(TouchCommands.java:89)
+   at org.apache.hadoop.fs.shell.TouchCommands$Touchz.processNonexistentPath(TouchCommands.java:85)
+   at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:303)
+   at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:285)
+   at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:121)
+   at org.apache.hadoop.fs.shell.Command.run(Command.java:192)
+   at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
+   at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:82)
+   at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:97)
+   at org.apache.hadoop.fs.FsShell.main(FsShell.java:390)
+Caused by: software.amazon.awssdk.services.s3.model.S3Exception: Invalid argument. (Service: S3, Status Code: 403, Request ID: null) (SDK Attempt Count: 1)
+   at software.amazon.awssdk.services.s3.model.S3Exception$BuilderImpl.build(S3Exception.java:113)
+   at software.amazon.awssdk.services.s3.model.S3Exception$BuilderImpl.build(S3Exception.java:61)
+...
+   at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
+   at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:11883)
+   at software.amazon.awssdk.services.s3.DelegatingS3Client.lambda$putObject$89(DelegatingS3Client.java:9716)
+   at software.amazon.awssdk.services.s3.internal.crossregion.S3CrossRegionSyncClient.invokeOperation(S3CrossRegionSyncClient.java:67)
+   at software.amazon.awssdk.services.s3.DelegatingS3Client.putObject(DelegatingS3Client.java:9716)
+   at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$14(S3AFileSystem.java:3332)
+   at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:650)
+   at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:3330)
+   at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$putObject$7(WriteOperationHelper.java:535)
+   at org.apache.hadoop.fs.store.audit.AuditingFunctions.lambda$withinAuditSpan$0(AuditingFunctions.java:62)
+   at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)
+   ... 20 more
+   touchz: example-file: Writing Object on example-file: software.amazon.awssdk.services.s3.model.S3Exception: Invalid argument. (Service: S3, Status Code: 403, Request ID: null) (SDK Attempt Count: 1):SignatureDoesNotMatch
+```
+
 ### <a name="no_region_session_credentials"></a> "Unable to find a region via the region provider chain." when using session credentials.
 
 Region must be provided when requesting session credentials, or an exception will be thrown with the
 message:
 
 ```
- Unable to find a region via the region provider
+Unable to find a region via the region provider
 chain. Must provide an explicit region in the builder or setup environment to supply a region.
 ```
 
@@ -1241,6 +1353,21 @@
 
 2. This setting ensures that all pending MPUs are aborted before the directory object is deleted, which is a requirement specific to S3 Express store buckets.
 
+## Status Code: 200 + "PreconditionFailed: At least one of the pre-conditions you specified did not hold"
+
+```
+software.amazon.awssdk.services.s3.model.S3Exception: At least one of the pre-conditions you specified did not hold
+(Service: S3, Status Code: 200, Request ID: 01a396cff3000198cc0439e40509a95e33467bdc, Extended Request ID: TZrsG8pBzlmXoV) (SDK Attempt Count: 1):
+PreconditionFailed: At least one of the pre-conditions you specified did not hold
+```
+
+An attempt to write to S3Express bucket using conditional overwrite failed because another process was writing at the same time.
+
+Conditional overwrite during file creation is used when conditional creation has been enabled (`fs.s3a.create.conditional.enabled`).
+This is true by default.
+
+* A file is created using the `createFile()` API with the option `fs.option.create.conditional.overwrite` set to true.
+* File create performance has been enabled with (`fs.s3a.performance.flags` including `create` or being `*`)
 
 ### Application hangs after reading a number of files
 
@@ -1333,6 +1460,39 @@
 
 Something has been trying to write data to "/".
 
+### "Unable to create OutputStream with the given multipart upload and buffer configuration."
+
+This error is raised when an attemt it made to write to a store with
+`fs.s3a.multipart.uploads.enabled` set to `false` and `fs.s3a.fast.upload.buffer` set to array.
+
+This is pre-emptively disabled before a write of so much data takes place that the process runs out of heap space.
+
+If the store doesn't support multipart uploads, _use disk for buffering_.
+Nothing else is safe to use as it leads to a state where small jobs work, but those which generate large amounts of data fail.
+
+```xml
+<property>
+  <name>fs.s3a.fast.upload.buffer</name>
+  <value>disk</value>
+</property>
+```
+
+```
+org.apache.hadoop.fs.PathIOException: `s3a://gcs/a2a8c3e4-5788-40c0-ad66-fe3fe63f4507': Unable to create OutputStream with the given multipart upload and buffer configuration.
+    at org.apache.hadoop.fs.s3a.S3AUtils.validateOutputStreamConfiguration(S3AUtils.java:985)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerCreateFile(S3AFileSystem.java:2201)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$create$5(S3AFileSystem.java:2068)
+    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:546)
+    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:527)
+    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:448)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2881)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2900)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:2067)
+    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1233)
+    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1210)
+    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1091)
+```
+
 ## <a name="best"></a> Best Practises
 
 ### <a name="logging"></a> Enabling low-level logging
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMkdirWithCreatePerf.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMkdirWithCreatePerf.java
index 4570320..42d175a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMkdirWithCreatePerf.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMkdirWithCreatePerf.java
@@ -31,6 +31,7 @@
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_PERFORMANCE_TESTS_ENABLED;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
 
@@ -42,9 +43,9 @@ public class ITestS3AContractMkdirWithCreatePerf extends AbstractContractMkdirTe
 
   @Override
   protected Configuration createConfiguration() {
-    return setPerformanceFlags(
-        super.createConfiguration(),
-        "create,mkdir");
+    final Configuration conf = super.createConfiguration();
+    disableFilesystemCaching(conf);
+    return setPerformanceFlags(conf, "create,mkdir");
   }
 
   @Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
index 0afdf20..6d98388 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.io.FileNotFoundException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.impl.ChecksumSupport;
 import org.apache.hadoop.test.tags.IntegrationTest;
 import org.apache.hadoop.test.tags.ScaleTest;
 
@@ -29,15 +32,23 @@
 import org.junit.jupiter.api.Test;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_GENERATION;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.DEFAULT_SCALE_TESTS_ENABLED;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_HUGE_PARTITION_SIZE;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_SCALE_TESTS_ENABLED;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.MULTIPART_COMMIT_CONSUMES_UPLOAD_ID;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+import static org.apache.hadoop.fs.s3a.impl.ChecksumSupport.getChecksumAlgorithm;
 import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
 
 /**
@@ -54,6 +65,8 @@ public class ITestS3AContractMultipartUploader extends
 
   private int partitionSize;
 
+  private boolean mpuCommitConsumesUploadId;
+
   /**
    * S3 requires a minimum part size of 5MB (except the last part).
    * @return 5MB+ value
@@ -95,7 +108,18 @@ protected boolean supportsConcurrentUploadsToSamePath() {
 
   @Override
   protected boolean finalizeConsumesUploadIdImmediately() {
-    return false;
+    return mpuCommitConsumesUploadId;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    // use whatever the default checksum generation option is.
+    removeBaseAndBucketOverrides(conf, CHECKSUM_GENERATION, CHECKSUM_ALGORITHM);
+    conf.setBoolean(CHECKSUM_GENERATION, false);
+    conf.set(CHECKSUM_ALGORITHM, ChecksumSupport.NONE);
+    disableFilesystemCaching(conf);
+    return conf;
   }
 
   @BeforeEach
@@ -110,9 +134,16 @@ public void setup() throws Exception {
     assume("Scale test disabled: to enable set property " +
             KEY_SCALE_TESTS_ENABLED,
         enabled);
+    final Configuration fsConf = getFileSystem().getConf();
+    assumeMultipartUploads(fsConf);
     partitionSize = (int) getTestPropertyBytes(conf,
         KEY_HUGE_PARTITION_SIZE,
         DEFAULT_HUGE_PARTITION_SIZE);
+    mpuCommitConsumesUploadId = fsConf.getBoolean(
+        MULTIPART_COMMIT_CONSUMES_UPLOAD_ID,
+        DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID);
+    LOG.info("{} = {}", MULTIPART_COMMIT_CONSUMES_UPLOAD_ID, mpuCommitConsumesUploadId);
+    LOG.info("{} = {}", CHECKSUM_ALGORITHM, getChecksumAlgorithm(fsConf));
   }
 
   /**
@@ -134,6 +165,7 @@ public void testMultipartUploadReverseOrder() throws Exception {
   @Override
   public void testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exception {
     assumeNotS3ExpressFileSystem(getFileSystem());
+    final Configuration fsConf = getFileSystem().getConf();
     super.testMultipartUploadReverseOrderNonContiguousPartNumbers();
   }
 
@@ -149,4 +181,17 @@ public void testConcurrentUploads() throws Throwable {
         "Analytics Accelerator currently does not support reading of over written files");
     super.testConcurrentUploads();
   }
+
+  @Test
+  @Override
+  public void testMultipartUploadAbort() throws Exception {
+    try {
+      super.testMultipartUploadAbort();
+    } catch (FileNotFoundException e) {
+      LOG.info("Multipart upload not found in abort()."
+          + " This is common on third-party stores: {}",
+          e.toString());
+      LOG.debug("Exception: ", e);
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
index 8f8f90f..be0d2cc 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
@@ -164,26 +164,33 @@ public void testMultiRowGroupParquet() throws Throwable {
 
     FileStatus fileStatus = getFileSystem().getFileStatus(dest);
 
-    byte[] buffer = new byte[3000];
+    final int size = 3000;
+    byte[] buffer = new byte[size];
+    int readLimit = Math.min(size, (int) fileStatus.getLen());
     IOStatistics ioStats;
 
+    final IOStatistics fsIostats = getFileSystem().getIOStatistics();
+    final long initialAuditCount = fsIostats.counters()
+        .getOrDefault(AUDIT_REQUEST_EXECUTION, 0L);
+
     try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
       ioStats = inputStream.getIOStatistics();
-      inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+      inputStream.readFully(buffer, 0, readLimit);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
 
     try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
+        .withFileStatus(fileStatus)
         .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET)
         .build().get()) {
       ioStats = inputStream.getIOStatistics();
-      inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+      inputStream.readFully(buffer, 0, readLimit);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
 
-    verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4);
+    verifyStatisticCounterValue(fsIostats, AUDIT_REQUEST_EXECUTION, initialAuditCount + 2);
   }
 
   @Test
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
index 18f665e..0e43ee3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.io.IOUtils;
 
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -36,6 +37,7 @@
 
 import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
 import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertCompleteAbort;
 import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertNoopAbort;
 
@@ -66,6 +68,16 @@ protected Configuration createConfiguration() {
     return conf;
   }
 
+  @Override
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+
+    skipIfNotEnabled(getFileSystem().getConf(),
+        MULTIPART_UPLOADS_ENABLED,
+        "Store has disabled multipart uploads; skipping tests");
+  }
+
   protected String getBlockOutputBufferName() {
     return FAST_UPLOAD_BUFFER_ARRAY;
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java
index 8b9a202..a7010ef 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java
@@ -44,7 +44,11 @@
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
 import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
 import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
+import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.isAwsEndpoint;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -59,6 +63,15 @@ public class ITestS3ABucketExistence extends AbstractS3ATestBase {
 
   private final URI uri = URI.create(FS_S3A + "://" + randomBucket + "/");
 
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    String endpoint = propagateBucketOptions(conf, getTestBucketName(conf)).get(ENDPOINT, "");
+    assume("Skipping existence probes",
+        isAwsEndpoint(endpoint));
+    return conf;
+  }
+
   @SuppressWarnings("deprecation")
   @Test
   public void testNoBucketProbing() throws Exception {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java
index 7526646..665703c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java
@@ -19,9 +19,13 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.MethodSource;
 import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
 import software.amazon.awssdk.services.s3.model.ChecksumMode;
 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
@@ -31,8 +35,14 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.impl.ChecksumSupport;
 
-import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_GENERATION;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CHECKSUM_GENERATION;
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
 import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS;
 
 /**
@@ -40,31 +50,58 @@
  * If CHECKSUM_ALGORITHM config is not set in auth-keys.xml,
  * SHA256 algorithm will be picked.
  */
+@ParameterizedClass(name="checksum={0}")
+@MethodSource("params")
 public class ITestS3AChecksum extends AbstractS3ATestBase {
 
-  private static final ChecksumAlgorithm DEFAULT_CHECKSUM_ALGORITHM = ChecksumAlgorithm.SHA256;
+  public static final String UNKNOWN = "UNKNOWN_TO_SDK_VERSION";
 
   private ChecksumAlgorithm checksumAlgorithm;
 
+  /**
+   * Parameterization.
+   */
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {"SHA256"},
+        {"CRC32C"},
+        {"SHA1"},
+        {UNKNOWN},
+    });
+  }
+
   private static final int[] SIZES = {
-      1, 2, 3, 4, 5, 254, 255, 256, 257, 2 ^ 12 - 1
+      5, 255, 256, 257, 2 ^ 12 - 1
   };
 
+  private final String algorithmName;
+
+  public ITestS3AChecksum(final String algorithmName) {
+    this.algorithmName = algorithmName;
+  }
+
   @Override
   protected Configuration createConfiguration() {
     final Configuration conf = super.createConfiguration();
+    // get the base checksum algorithm, if set it will be left alone.
+    final String al = conf.getTrimmed(CHECKSUM_ALGORITHM, "");
+    if (!UNKNOWN.equals(algorithmName) &&
+        (ChecksumSupport.NONE.equalsIgnoreCase(al) || UNKNOWN.equalsIgnoreCase(al))) {
+      skip("Skipping checksum algorithm tests");
+    }
     S3ATestUtils.removeBaseAndBucketOverrides(conf,
         CHECKSUM_ALGORITHM,
+        CHECKSUM_VALIDATION,
         REJECT_OUT_OF_SPAN_OPERATIONS);
     S3ATestUtils.disableFilesystemCaching(conf);
-    checksumAlgorithm = ChecksumSupport.getChecksumAlgorithm(conf);
-    if (checksumAlgorithm == null) {
-      checksumAlgorithm = DEFAULT_CHECKSUM_ALGORITHM;
-      LOG.info("No checksum algorithm found in configuration, will use default {}",
-          checksumAlgorithm);
-      conf.set(CHECKSUM_ALGORITHM, checksumAlgorithm.toString());
-    }
+    conf.set(CHECKSUM_ALGORITHM, algorithmName);
+    conf.setBoolean(CHECKSUM_VALIDATION, true);
     conf.setBoolean(REJECT_OUT_OF_SPAN_OPERATIONS, false);
+    checksumAlgorithm = ChecksumSupport.getChecksumAlgorithm(conf);
+    LOG.info("Using checksum algorithm {}/{}", algorithmName, checksumAlgorithm);
+    assume("Skipping checksum tests as " + CHECKSUM_GENERATION + " is set",
+        propagateBucketOptions(conf, getTestBucketName(conf))
+            .getBoolean(CHECKSUM_GENERATION, DEFAULT_CHECKSUM_GENERATION));
     return conf;
   }
 
@@ -77,14 +114,15 @@ public void testChecksum() throws IOException {
 
   private void validateChecksumForFilesize(int len) throws IOException {
     describe("Create a file of size " + len);
-    String src = String.format("%s-%04x", methodName.getMethodName(), len);
-    Path path = writeThenReadFile(src, len);
+    final Path path = methodPath();
+    writeThenReadFile(path, len);
     assertChecksum(path);
-    rm(getFileSystem(), path, false, false);
   }
 
   private void assertChecksum(Path path) throws IOException {
     final String key = getFileSystem().pathToKey(path);
+    // issue a head request and include asking for the checksum details.
+    // such a query may require extra IAM permissions.
     HeadObjectRequest.Builder requestBuilder = getFileSystem().getRequestFactory()
         .newHeadObjectRequestBuilder(key)
         .checksumMode(ChecksumMode.ENABLED);
@@ -101,6 +139,9 @@ private void assertChecksum(Path path) throws IOException {
       Assertions.assertThat(headObject.checksumCRC32C())
           .describedAs("headObject.checksumCRC32C()")
           .isNotNull();
+      Assertions.assertThat(headObject.checksumSHA256())
+          .describedAs("headObject.checksumSHA256()")
+          .isNull();
       break;
     case SHA1:
       Assertions.assertThat(headObject.checksumSHA1())
@@ -112,6 +153,14 @@ private void assertChecksum(Path path) throws IOException {
           .describedAs("headObject.checksumSHA256()")
           .isNotNull();
       break;
+    case UNKNOWN_TO_SDK_VERSION:
+      // expect values to be null
+      // this is brittle with different stores; crc32 assertions have been cut
+      // because S3 express always set them.
+      Assertions.assertThat(headObject.checksumSHA256())
+          .describedAs("headObject.checksumSHA256()")
+          .isNull();
+      break;
     default:
       fail("Checksum algorithm not supported: " + checksumAlgorithm);
     }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
index ae0daf8..25efe7a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
@@ -56,6 +56,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.DEFAULT_REQUESTER_PAYS_BUCKET_NAME;
@@ -362,7 +363,7 @@ public void testCentralEndpointAndDifferentRegionThanBucket() throws Throwable {
   public void testWithOutCrossRegionAccess() throws Exception {
     describe("Verify cross region access fails when disabled");
     // skip the test if the region is sa-east-1
-    skipCrossRegionTest();
+    assumeCrossRegionTestSupported();
     final Configuration newConf = new Configuration(getConfiguration());
     removeBaseAndBucketOverrides(newConf,
         ENDPOINT,
@@ -383,7 +384,7 @@ public void testWithOutCrossRegionAccess() throws Exception {
   public void testWithCrossRegionAccess() throws Exception {
     describe("Verify cross region access succeed when enabled");
     // skip the test if the region is sa-east-1
-    skipCrossRegionTest();
+    assumeCrossRegionTestSupported();
     final Configuration newConf = new Configuration(getConfiguration());
     removeBaseAndBucketOverrides(newConf,
         ENDPOINT,
@@ -484,6 +485,7 @@ public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable {
     describe("Access the test bucket using central endpoint and"
         + " null region, perform file system CRUD operations");
     final Configuration conf = getConfiguration();
+    assumeCrossRegionTestSupported();
 
     final Configuration newConf = new Configuration(conf);
 
@@ -506,7 +508,7 @@ public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable {
   public void testCentralEndpointAndNullRegionFipsWithCRUD() throws Throwable {
     describe("Access the test bucket using central endpoint and"
         + " null region and fips enabled, perform file system CRUD operations");
-    assumeStoreAwsHosted(getFileSystem());
+    assumeCrossRegionTestSupported();
 
     final String bucketLocation = getFileSystem().getBucketLocation();
     assume("FIPS can be enabled to access buckets from US or Canada endpoints only",
@@ -514,17 +516,19 @@ public void testCentralEndpointAndNullRegionFipsWithCRUD() throws Throwable {
             || bucketLocation.startsWith(CA_REGION_PREFIX)
             || bucketLocation.startsWith(US_DUAL_STACK_PREFIX));
 
-    final Configuration conf = getConfiguration();
+    final Configuration conf = getFileSystem().getConf();
     final Configuration newConf = new Configuration(conf);
 
     removeBaseAndBucketOverrides(
         newConf,
         ENDPOINT,
         AWS_REGION,
-        FIPS_ENDPOINT);
+        FIPS_ENDPOINT,
+        PATH_STYLE_ACCESS);
 
     newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
     newConf.setBoolean(FIPS_ENDPOINT, true);
+    newConf.setBoolean(PATH_STYLE_ACCESS, false);
 
     newFS = new S3AFileSystem();
     newFS.initialize(getFileSystem().getUri(), newConf);
@@ -533,10 +537,18 @@ public void testCentralEndpointAndNullRegionFipsWithCRUD() throws Throwable {
   }
 
   /**
-   * Skip the test if the region is null or sa-east-1.
+   * Skip the test if the region is null,  sa-east-1, or otherwise
+   * not compatible with the test.
    */
-  private void skipCrossRegionTest() throws IOException {
-    String region = getFileSystem().getS3AInternals().getBucketMetadata().bucketRegion();
+  private void assumeCrossRegionTestSupported() throws IOException {
+    final S3AFileSystem fs = getFileSystem();
+
+    // not S3 as the store URLs may not resolve.
+    assumeNotS3ExpressFileSystem(fs);
+    // aws hosted.
+    assumeStoreAwsHosted(fs);
+
+    String region = fs.getS3AInternals().getBucketMetadata().bucketRegion();
     if (region == null || SA_EAST_1.equals(region)) {
       skip("Skipping test since region is null or it is set to sa-east-1");
     }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java
index 1fcc41a..83a3def 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a;
 
 import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.services.s3.model.MultipartUpload;
 
@@ -31,6 +32,7 @@
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
 
 /**
@@ -54,6 +56,13 @@ protected Configuration createConfiguration() {
     return conf;
   }
 
+  @Override
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    assumeMultipartUploads(getFileSystem().getConf());
+  }
+
   /**
    * Main test case for upload part listing and iterator paging.
    * @throws Exception on failure.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
index 5e66d69..05eb8c9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
@@ -126,8 +126,6 @@ public void testReadLargeFileFully() throws Throwable {
     }
     // Verify that once stream is closed, all memory is freed
     verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
-    assertThatStatisticMaximum(ioStats,
-        ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
   }
 
   @Test
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 8743907..259a8a5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -284,4 +284,22 @@ public interface S3ATestConstants {
    * Default policy on root tests: {@value}.
    */
   boolean DEFAULT_ROOT_TESTS_ENABLED = true;
+
+  /**
+   * Flag to set when testing third party stores: {@value}.
+   * <p>
+   * Set to true when a completed MPU commit consumes the ID so it is no
+   * longer visible in list operations; and abort reports {@code NoSuchUploadException}.
+   * <p>
+   * This will change assertions in relevant tests.
+   * <p>
+   * Can be set as a per-bucket setting; test runner will pick this up.
+   */
+  String MULTIPART_COMMIT_CONSUMES_UPLOAD_ID =
+      "fs.s3a.ext.test.multipart.commit.consumes.upload.id";
+
+  /**
+   * Default value of {@link #MULTIPART_COMMIT_CONSUMES_UPLOAD_ID}: {@value}.
+   */
+  boolean DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID = false;
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 07af6e6..1eb302a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -534,6 +534,32 @@ public static void skipIfNotEnabled(final Configuration configuration,
   }
 
   /**
+   * Skip a test suite/case if a configuration option is true.
+   * @param configuration configuration to probe
+   * @param key key to resolve
+   * @param defVal default value.
+   * @param message assertion text
+   */
+  public static void skipIfEnabled(final Configuration configuration,
+      final String key,
+      final boolean defVal,
+      final String message) {
+    if (!configuration.getBoolean(key, defVal)) {
+      skip(message);
+    }
+  }
+
+  /**
+   * Require multipart uploads; skip tests if not enabled in the configuration.
+   * @param conf filesystem configuration.
+   */
+  public static void assumeMultipartUploads(Configuration conf) {
+    skipIfNotEnabled(conf,
+        MULTIPART_UPLOADS_ENABLED,
+        "Store has disabled multipart uploads; skipping tests");
+  }
+
+  /**
    * Skip a test if storage class tests are disabled,
    * or the bucket is an S3Express bucket.
    * @param configuration configuration to probe
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
index 2e7ed1c..9dbf92b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
@@ -453,6 +453,19 @@ public void testOpenSSLErrorRetry() throws Throwable {
                 sdkClientException(WFOPENSSL_0035_STREAM_IS_CLOSED, null))));
   }
 
+  @Test
+  public void testS3ExpressPreconditionFailure() throws Throwable {
+    AwsServiceException ase = AwsServiceException.builder()
+        .message("unwind")
+        .statusCode(SC_200_OK)
+        .awsErrorDetails(AwsErrorDetails.builder()
+            .errorCode(PRECONDITION_FAILED)
+            .build())
+        .build();
+    verifyExceptionClass(RemoteFileChangedException.class,
+        translateException("commit", "/path", ase));
+  }
+
   /**
    * Create a shaded NoHttpResponseException.
    * @return an exception.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
index d9c84fa..58eb3b6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
@@ -86,6 +86,10 @@
 /**
  * Tests use of assumed roles.
  * Only run if an assumed role is provided.
+ * <p>
+ * S3Express buckets only support access restrictions at the bucket level,
+ * rather than at paths underneath.
+ * All partial permission tests are disabled.
  */
 @SuppressWarnings("ThrowableNotThrown")
 public class ITestAssumeRole extends AbstractS3ATestBase {
@@ -201,9 +205,6 @@ protected Configuration createValidRoleConf() throws JsonProcessingException {
     conf.set(ASSUMED_ROLE_ARN, roleARN);
     conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
     conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
-    // disable create session so there's no need to
-    // add a role policy for it.
-    disableCreateSession(conf);
 
     bindRolePolicy(conf, RESTRICTED_POLICY);
     return conf;
@@ -462,12 +463,15 @@ public void testAssumeRolePoliciesOverrideRolePerms() throws Throwable {
   public void testReadOnlyOperations() throws Throwable {
 
     describe("Restrict role to read only");
+    skipIfS3ExpressBucket(getConfiguration());
     Configuration conf = createAssumedRoleConfig();
 
     bindRolePolicy(conf,
         policy(
             statement(false, S3_ALL_BUCKETS, S3_PATH_WRITE_OPERATIONS),
-            STATEMENT_ALL_S3, STATEMENT_ALLOW_KMS_RW));
+            STATEMENT_ALL_S3,
+            STATEMENT_S3EXPRESS,
+            STATEMENT_ALLOW_KMS_RW));
     Path path = methodPath();
     roleFS = (S3AFileSystem) path.getFileSystem(conf);
     // list the root path, expect happy
@@ -505,6 +509,7 @@ public void testRestrictedWriteSubdir() throws Throwable {
 
     describe("Attempt writing to paths where a role only has"
         + " write access to a subdir of the bucket");
+    skipIfS3ExpressBucket(getConfiguration());
     Path restrictedDir = methodPath();
     Path child = new Path(restrictedDir, "child");
     // the full FS
@@ -567,6 +572,7 @@ public void testAssumedRoleRetryHandler() throws Throwable {
   @Test
   public void testRestrictedCommitActions() throws Throwable {
     describe("Attempt commit operations against a path with restricted rights");
+    skipIfS3ExpressBucket(getConfiguration());
     Configuration conf = createAssumedRoleConfig();
     final int uploadPartSize = 5 * 1024 * 1024;
 
@@ -704,12 +710,14 @@ public void writeCSVData(final File localSrc) throws IOException {
   @Test
   public void testPartialDelete() throws Throwable {
     describe("delete with part of the child tree read only; multidelete");
+    skipIfS3ExpressBucket(getConfiguration());
     executePartialDelete(createAssumedRoleConfig(), false);
   }
 
   @Test
   public void testPartialDeleteSingleDelete() throws Throwable {
     describe("delete with part of the child tree read only");
+    skipIfS3ExpressBucket(getConfiguration());
     executePartialDelete(createAssumedRoleConfig(), true);
   }
 
@@ -722,6 +730,7 @@ public void testBulkDeleteOnReadOnlyAccess() throws Throwable {
   @Test
   public void testBulkDeleteWithReadWriteAccess() throws Throwable {
     describe("Bulk delete with read write access");
+    skipIfS3ExpressBucket(getConfiguration());
     executeBulkDeleteOnSomeReadOnlyFiles(createAssumedRoleConfig());
   }
 
@@ -811,6 +820,7 @@ private static void bindReadOnlyRolePolicy(Configuration assumedRoleConfig,
           throws JsonProcessingException {
     bindRolePolicyStatements(assumedRoleConfig, STATEMENT_ALLOW_KMS_RW,
         statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS),
+        STATEMENT_S3EXPRESS,
         new Statement(Effects.Deny)
             .addActions(S3_PATH_WRITE_OPERATIONS)
             .addResources(directory(readOnlyDir))
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
index 29201a8..721cb01 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
@@ -70,7 +70,9 @@ public void setup() throws Exception {
     restrictedDir = super.path("restricted");
     Configuration conf = newAssumedRoleConfig(getConfiguration(),
         getAssumedRoleARN());
-    bindRolePolicyStatements(conf, STATEMENT_ALLOW_KMS_RW,
+    bindRolePolicyStatements(conf,
+        STATEMENT_ALLOW_KMS_RW,
+        STATEMENT_S3EXPRESS,
         statement(true, S3_ALL_BUCKETS, S3_BUCKET_READ_OPERATIONS),
         new RoleModel.Statement(RoleModel.Effects.Allow)
             .addActions(S3_PATH_RW_OPERATIONS)
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
index 1a4d354..3833e2c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
@@ -25,6 +25,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.jupiter.api.AfterEach;
@@ -53,15 +54,19 @@
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.auth.ITestCustomSigner.CustomSignerInitializer.StoreValue;
 import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
+import org.apache.hadoop.fs.s3a.impl.ChecksumSupport;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
 import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
 import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
+import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
 import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
 
 /**
  * Tests for custom Signers and SignerInitializers.
@@ -79,6 +84,13 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
   private static final String TEST_REGION_KEY = "TEST_REGION_KEY";
 
   /**
+   * Is the store using path style access?
+   */
+  private static final AtomicBoolean PATH_STYLE_ACCESS_IN_USE = new AtomicBoolean(false);
+
+  public static final String BUCKET = "bucket";
+
+  /**
    * Parameterization.
    */
   public static Collection<Object[]> params() {
@@ -118,7 +130,11 @@ public void setup() throws Exception {
     super.setup();
     final S3AFileSystem fs = getFileSystem();
     final Configuration conf = fs.getConf();
+    if (bulkDelete) {
+      skipIfNotEnabled(conf, ENABLE_MULTI_DELETE, "no bulk delete");
+    }
     endpoint = conf.getTrimmed(Constants.ENDPOINT, Constants.CENTRAL_ENDPOINT);
+    PATH_STYLE_ACCESS_IN_USE.set(conf.getBoolean(PATH_STYLE_ACCESS, false));
     LOG.info("Test endpoint is {}", endpoint);
     regionName = conf.getTrimmed(Constants.AWS_REGION, "");
     if (regionName.isEmpty()) {
@@ -166,6 +182,7 @@ private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi,
       throws IOException, InterruptedException {
     Configuration conf = createTestConfig(identifier);
     return ugi.doAs((PrivilegedExceptionAction<S3AFileSystem>) () -> {
+      LOG.info("Performing store operations for {}", ugi.getShortUserName());
       int instantiationCount = CustomSigner.getInstantiationCount();
       int invocationCount = CustomSigner.getInvocationCount();
       S3AFileSystem fs = (S3AFileSystem)finalPath.getFileSystem(conf);
@@ -199,11 +216,13 @@ private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi,
       ContractTestUtils.touch(fs, new Path(subdir, "file1"));
 
       // create a magic file.
-      createMagicFile(fs, subdir);
-      ContentSummary summary = fs.getContentSummary(finalPath);
-      fs.getS3AInternals().abortMultipartUploads(subdir);
-      fs.rename(subdir, new Path(finalPath, "renamed"));
-      fs.delete(finalPath, true);
+      if (fs.isMagicCommitEnabled()) {
+        createMagicFile(fs, subdir);
+        ContentSummary summary = fs.getContentSummary(finalPath);
+        fs.getS3AInternals().abortMultipartUploads(subdir);
+        fs.rename(subdir, new Path(finalPath, "renamed"));
+        fs.delete(finalPath, true);
+      }
       return fs;
     });
   }
@@ -217,10 +236,13 @@ private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi,
   private Configuration createTestConfig(String identifier) {
     Configuration conf = createConfiguration();
 
+    // bulk delete is not disabled; if it has been set to false by the bucket
+    // then one of the test runs will be skipped.
     removeBaseAndBucketOverrides(conf,
+        CHECKSUM_ALGORITHM,
+        CHECKSUM_VALIDATION,
         CUSTOM_SIGNERS,
-        SIGNING_ALGORITHM_S3,
-        ENABLE_MULTI_DELETE);
+        SIGNING_ALGORITHM_S3);
     conf.set(CUSTOM_SIGNERS,
         "CustomS3Signer:" + CustomSigner.class.getName() + ":"
             + CustomSignerInitializer.class.getName());
@@ -233,7 +255,8 @@ private Configuration createTestConfig(String identifier) {
     // Having the checksum algorithm in this test causes
     // x-amz-sdk-checksum-algorithm specified, but no corresponding
     // x-amz-checksum-* or x-amz-trailer headers were found
-    conf.unset(CHECKSUM_ALGORITHM);
+    conf.set(CHECKSUM_ALGORITHM, ChecksumSupport.NONE);
+    conf.setBoolean(CHECKSUM_VALIDATION, false);
 
     // make absolutely sure there is no caching.
     disableFilesystemCaching(conf);
@@ -283,6 +306,9 @@ public SdkHttpFullRequest sign(SdkHttpFullRequest request,
 
       String host = request.host();
       String bucketName = parseBucketFromHost(host);
+      if (PATH_STYLE_ACCESS_IN_USE.get()) {
+        bucketName = BUCKET;
+      }
       try {
         lastStoreValue = CustomSignerInitializer
             .getStoreValue(bucketName, UserGroupInformation.getCurrentUser());
@@ -325,11 +351,20 @@ public static String description() {
   public static final class CustomSignerInitializer
       implements AwsSignerInitializer {
 
+    /**
+     * Map of (bucket-name, ugi) -> store value.
+     * <p>
+     * When working with buckets using path-style resolution, the store bucket name
+     * is just {@link #BUCKET}.
+     */
     private static final Map<StoreKey, StoreValue> knownStores = new HashMap<>();
 
     @Override
     public void registerStore(String bucketName, Configuration storeConf,
         DelegationTokenProvider dtProvider, UserGroupInformation storeUgi) {
+      if (PATH_STYLE_ACCESS_IN_USE.get()) {
+        bucketName = BUCKET;
+      }
       StoreKey storeKey = new StoreKey(bucketName, storeUgi);
       StoreValue storeValue = new StoreValue(storeConf, dtProvider);
       LOG.info("Registering store {} with value {}", storeKey, storeValue);
@@ -339,6 +374,9 @@ public void registerStore(String bucketName, Configuration storeConf,
     @Override
     public void unregisterStore(String bucketName, Configuration storeConf,
         DelegationTokenProvider dtProvider, UserGroupInformation storeUgi) {
+      if (PATH_STYLE_ACCESS_IN_USE.get()) {
+        bucketName = BUCKET;
+      }
       StoreKey storeKey = new StoreKey(bucketName, storeUgi);
       LOG.info("Unregistering store {}", storeKey);
       knownStores.remove(storeKey);
@@ -354,9 +392,17 @@ public static void reset() {
     public static StoreValue getStoreValue(String bucketName,
         UserGroupInformation ugi) {
       StoreKey storeKey = new StoreKey(bucketName, ugi);
-      return knownStores.get(storeKey);
+      final StoreValue storeValue = knownStores.get(storeKey);
+      LOG.info("Getting store value for key {}: {}", storeKey, storeValue);
+      return storeValue;
     }
 
+    /**
+     * The key for the signer map: bucket-name and UGI.
+     * <p>
+     * In path-style-access the bucket name is mapped to {@link #BUCKET} so only
+     * one bucket per UGI instance is supported.
+     */
     private static class StoreKey {
       private final String bucketName;
       private final UserGroupInformation ugi;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestHttpSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestHttpSigner.java
index 1a60c01..a4ffae9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestHttpSigner.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestHttpSigner.java
@@ -144,11 +144,13 @@ private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi,
       ContractTestUtils.touch(fs, new Path(subdir, "file1"));
 
       // create a magic file.
-      createMagicFile(fs, subdir);
-      ContentSummary summary = fs.getContentSummary(finalPath);
-      fs.getS3AInternals().abortMultipartUploads(subdir);
-      fs.rename(subdir, new Path(finalPath, "renamed"));
-      fs.delete(finalPath, true);
+      if (fs.isMagicCommitEnabled()) {
+        createMagicFile(fs, subdir);
+        ContentSummary summary = fs.getContentSummary(finalPath);
+        fs.getS3AInternals().abortMultipartUploads(subdir);
+        fs.rename(subdir, new Path(finalPath, "renamed"));
+        fs.delete(finalPath, true);
+      }
       return fs;
     });
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java
index ae05a8d..8f00c11 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java
@@ -49,6 +49,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfS3ExpressBucket;
 import static org.apache.hadoop.fs.s3a.auth.RoleModel.Effects;
 import static org.apache.hadoop.fs.s3a.auth.RoleModel.Statement;
 import static org.apache.hadoop.fs.s3a.auth.RoleModel.directory;
@@ -88,7 +89,9 @@
  * To simplify maintenance, the operations tested are broken up into
  * their own methods, with fields used to share the restricted role and
  * created paths.
- *
+ * <p>
+ * Test are skipped if no assumed role was provided, or if the test bucket
+ * is an S3Express bucket, whose permissions model is different.
  */
 public class ITestRestrictedReadAccess extends AbstractS3ATestBase {
 
@@ -161,6 +164,7 @@ public class ITestRestrictedReadAccess extends AbstractS3ATestBase {
   public void setup() throws Exception {
     super.setup();
     assumeRoleTests();
+    skipIfS3ExpressBucket(getConfiguration());
   }
 
   @AfterEach
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
index a1dd177..579c5b8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
@@ -164,8 +164,8 @@ public static Configuration newAssumedRoleConfig(
     conf.set(ASSUMED_ROLE_ARN, roleARN);
     conf.set(ASSUMED_ROLE_SESSION_NAME, "test");
     conf.set(ASSUMED_ROLE_SESSION_DURATION, "15m");
-    // force in bucket resolution during startup
-    conf.setInt(S3A_BUCKET_PROBE, 1);
+    // disable bucket resolution during startup as s3 express doesn't like it
+    conf.setInt(S3A_BUCKET_PROBE, 0);
     disableCreateSession(conf);
     disableFilesystemCaching(conf);
     return conf;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index 5fc5cf9..b270b5d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -173,7 +173,7 @@ public void setup() throws Exception {
     taskAttempt0 = TaskAttemptID.forName(attempt0);
     attempt1 = "attempt_" + jobId + "_m_000001_0";
     taskAttempt1 = TaskAttemptID.forName(attempt1);
-
+    assumeMultipartUploads(getFileSystem().getConf());
     outDir = path(getMethodName());
     abortMultipartUploadsUnderPath(outDir);
     cleanupDestDir();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
index 9ad2c06..fcc2410 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
@@ -42,6 +42,7 @@
 import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
 import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED;
@@ -84,6 +85,7 @@ public class ITestCommitOperationCost extends AbstractS3ACostTest {
   @Override
   public void setup() throws Exception {
     super.setup();
+    assumeMultipartUploads(getFileSystem().getConf());
     testHelper = new CommitterTestHelper(getFileSystem());
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
index ddd306d..005235c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
@@ -94,6 +94,7 @@ protected Configuration createConfiguration() {
   public void setup() throws Exception {
     FileSystem.closeAll();
     super.setup();
+    assumeMultipartUploads(getFileSystem().getConf());
     verifyIsMagicCommitFS(getFileSystem());
     progress = new ProgressCounter();
     progress.assertCount("progress", 0);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
index 2956bfc..fb18ccb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
@@ -47,6 +47,7 @@
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING;
@@ -186,6 +187,7 @@ public void setup() throws Exception {
     // destroy all filesystems from previous runs.
     FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
     super.setup();
+    assumeMultipartUploads(getFileSystem().getConf());
     jobId = randomJobId();
     attempt0 = "attempt_" + jobId + "_m_000000_0";
     taskAttempt0 = TaskAttemptID.forName(attempt0);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java
index c5144a4..8e03def 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a.commit;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.UUID;
@@ -59,11 +60,13 @@
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
 import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
 import static org.apache.hadoop.fs.s3a.Constants.RETRY_HTTP_5XX_ERRORS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_EXECUTION_INTERCEPTORS;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
 import static org.apache.hadoop.fs.s3a.test.SdkFaultInjector.setRequestFailureConditions;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
  * Test upload recovery by injecting failures into the response chain.
@@ -159,6 +162,10 @@ public Configuration createConfiguration() {
   public void setup() throws Exception {
     SdkFaultInjector.resetFaultInjector();
     super.setup();
+    if (!FAST_UPLOAD_BUFFER_DISK.equals(buffer)) {
+      assumeMultipartUploads(getFileSystem().getConf());
+    }
+
   }
 
   @AfterEach
@@ -167,7 +174,6 @@ public void teardown() throws Exception {
     // safety check in case the evaluation is failing any
     // request needed in cleanup.
     SdkFaultInjector.resetFaultInjector();
-
     super.teardown();
   }
 
@@ -264,9 +270,18 @@ public void testCommitOperations() throws Throwable {
     setRequestFailureConditions(2,
         SdkFaultInjector::isCompleteMultipartUploadRequest);
 
+    boolean mpuCommitConsumesUploadId = getFileSystem().getConf().getBoolean(
+        MULTIPART_COMMIT_CONSUMES_UPLOAD_ID,
+        DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID);
     try (CommitContext commitContext
              = actions.createCommitContextForTesting(dest, JOB_ID, 0)) {
-      commitContext.commitOrFail(commit);
+
+      if (mpuCommitConsumesUploadId) {
+        intercept(FileNotFoundException.class, () ->
+            commitContext.commitOrFail(commit));
+      } else {
+        commitContext.commitOrFail(commit);
+      }
     }
     // make sure the saved data is as expected
     verifyFileContents(fs, dest, dataset);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
index 8ef109d..e091ee3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
@@ -73,6 +73,7 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.DurationInfo;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@@ -184,6 +185,7 @@ public ITestS3ACommitterMRJob(
   @BeforeEach
   public void setup() throws Exception {
     super.setup();
+    assumeMultipartUploads(getFileSystem().getConf());
     // configure the test binding for this specific test case.
     committerTestBinding.setup(getClusterBinding(), getFileSystem());
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
index edcc7bf..2b8ca4b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
@@ -120,6 +120,15 @@ public void setup() throws Exception {
   }
 
   /**
+   * Skip this test suite when MPUS are not avaialable.
+   * @return false
+   */
+  @Override
+  protected boolean requireMultipartUploads() {
+    return true;
+  }
+
+  /**
    * Returns the path to the commit metadata file, not that of the huge file.
    * @return a file in the job dir
    */
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
index 536a158..ae6bd59 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
@@ -57,6 +57,7 @@
 import org.apache.hadoop.util.ToolRunner;
 
 import static java.util.Optional.empty;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
@@ -148,6 +149,7 @@ protected String committerName() {
   public void setup() throws Exception {
     super.setup();
     requireScaleTestsEnabled();
+    assumeMultipartUploads(getFileSystem().getConf());
     prepareToTerasort();
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
index 09911b3..142a71b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
@@ -53,6 +53,8 @@
 import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE_CLASSIC;
 import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
 import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
 import static org.apache.hadoop.fs.s3a.Constants.PART_UPLOAD_TIMEOUT;
@@ -111,6 +113,7 @@ private Configuration timingOutConfiguration() {
         CONNECTION_ACQUISITION_TIMEOUT,
         CONNECTION_IDLE_TIME,
         ESTABLISH_TIMEOUT,
+        INPUT_STREAM_TYPE,
         MAX_ERROR_RETRIES,
         MAXIMUM_CONNECTIONS,
         PART_UPLOAD_TIMEOUT,
@@ -126,6 +129,7 @@ private Configuration timingOutConfiguration() {
     // needed to ensure that streams are kept open.
     // without this the tests is unreliable in batch runs.
     disablePrefetching(conf);
+    conf.set(INPUT_STREAM_TYPE, INPUT_STREAM_TYPE_CLASSIC);
     conf.setInt(RETRY_LIMIT, 0);
     conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, true);
     final Duration ms10 = Duration.ofMillis(10);
@@ -192,7 +196,7 @@ public void testObjectUploadTimeouts() throws Throwable {
     AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
     final Path dir = methodPath();
     Path file = new Path(dir, "file");
-    Configuration conf = new Configuration(getConfiguration());
+    Configuration conf = new Configuration(getFileSystem().getConf());
     removeBaseAndBucketOverrides(conf,
         PART_UPLOAD_TIMEOUT,
         REQUEST_TIMEOUT,
@@ -259,20 +263,21 @@ public void testObjectUploadTimeouts() throws Throwable {
 
       // and try a multipart upload to verify that its requests also outlast
       // the short requests
-      SdkFaultInjector.setRequestFailureConditions(999,
-          SdkFaultInjector::isPartUpload);
-      Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "0001/__base/file2");
-      totalSleepTime.set(0);
-      OperationDuration dur2 = new DurationInfo(LOG, "Creating File");
-      ContractTestUtils.createFile(brittleFS, magicFile, true, DATASET);
-      dur2.finished();
-      Assertions.assertThat(totalSleepTime.get())
-          .describedAs("total sleep time of magic write")
-          .isGreaterThan(0);
-      Assertions.assertThat(dur2.asDuration())
-          .describedAs("Duration of magic write")
-          .isGreaterThan(shortTimeout);
-      brittleFS.delete(dir, true);
+      if (fs.isMagicCommitEnabled()) {
+        SdkFaultInjector.setRequestFailureConditions(999,
+            SdkFaultInjector::isPartUpload);
+        Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "0001/__base/file2");
+        totalSleepTime.set(0);
+        OperationDuration dur2 = new DurationInfo(LOG, "Creating File");
+        ContractTestUtils.createFile(brittleFS, magicFile, true, DATASET);
+        dur2.finished();
+        Assertions.assertThat(totalSleepTime.get())
+            .describedAs("total sleep time of magic write")
+            .isGreaterThan(0);
+        Assertions.assertThat(dur2.asDuration())
+            .describedAs("Duration of magic write")
+            .isGreaterThan(shortTimeout);
+      }
     }
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
index 8afb60d..31c99bc 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
@@ -74,7 +74,9 @@
  * Test partial failures of delete and rename operations,.
  *
  * All these test have a unique path for each run, with a roleFS having
- * full RW access to part of it, and R/O access to a restricted subdirectory
+ * full RW access to part of it, and R/O access to a restricted subdirectory.
+ * <p>
+ * Tests are skipped on S3Express buckets or if no assumed role is provided.
  *
  * <ol>
  *   <li>
@@ -221,6 +223,7 @@ public ITestPartialRenamesDeletes(final boolean multiDelete) {
   public void setup() throws Exception {
     super.setup();
     assumeRoleTests();
+    skipIfS3ExpressBucket(getConfiguration());
     basePath = uniquePath();
     readOnlyDir = new Path(basePath, "readonlyDir");
     writableDir = new Path(basePath, "writableDir");
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java
index 9922494..3e02857 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java
@@ -43,10 +43,12 @@
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.test.LambdaTestUtils;
 
 import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
 import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_200_OK;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
@@ -76,6 +78,8 @@ public class ITestS3APutIfMatchAndIfNoneMatch extends AbstractS3ATestBase {
   private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 255);
   private static final byte[] MULTIPART_FILE_BYTES = dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a');
 
+  public static final String PRECONDITION_FAILED = "PreconditionFailed";
+
   private BlockOutputStreamStatistics statistics;
 
   @Override
@@ -101,22 +105,30 @@ public Configuration createConfiguration() {
   @BeforeEach
   public void setup() throws Exception {
     super.setup();
-    Configuration conf = getConfiguration();
+    Configuration conf = getFileSystem().getConf();
     assumeConditionalCreateEnabled(conf);
   }
 
   /**
    * Asserts that an S3Exception has the expected HTTP status code.
-   *
    * @param code Expected HTTP status code.
-   * @param ex   Exception to validate.
+   * @param ex Exception to validate.
+   * @return the inner exception
+   * @throws AssertionError if the status code doesn't match.
    */
-  private static void assertS3ExceptionStatusCode(int code, Exception ex) {
-    S3Exception s3Exception = (S3Exception) ex.getCause();
-
+  private static S3Exception verifyS3ExceptionStatusCode(int code, Exception ex) {
+    final Throwable cause = ex.getCause();
+    if (cause == null) {
+      throw new AssertionError("No inner exception of" + ex, ex);
+    }
+    if (!(cause instanceof S3Exception)) {
+      throw new AssertionError("Inner exception is not S3Exception under " + ex, ex);
+    }
+    S3Exception s3Exception = (S3Exception) cause;
     if (s3Exception.statusCode() != code) {
       throw new AssertionError("Expected status code " + code + " from " + ex, ex);
     }
+    return s3Exception;
   }
 
   /**
@@ -296,12 +308,12 @@ public void testIfNoneMatchConflictOnOverwrite() throws Throwable {
     // attempted overwrite fails
     RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class,
             () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null));
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException);
+    verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException);
 
     // second attempt also fails
     RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class,
             () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null));
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException);
+    verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException);
 
     // Delete file and verify an overwrite works again
     fs.delete(testFile, false);
@@ -320,13 +332,11 @@ public void testIfNoneMatchConflictOnMultipartUpload() throws Throwable {
 
     createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true);
 
-    RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class,
-            () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true));
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException);
+    expectPreconditionFailure(() ->
+        createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true));
 
-    RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class,
-            () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true));
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException);
+    expectPreconditionFailure(() ->
+        createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true));
   }
 
   @Test
@@ -348,8 +358,7 @@ public void testIfNoneMatchMultipartUploadWithRaceCondition() throws Throwable {
     createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
 
     // Closing the first stream should throw RemoteFileChangedException
-    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close);
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    expectPreconditionFailure(stream::close);
   }
 
   @Test
@@ -371,8 +380,24 @@ public void testIfNoneMatchTwoConcurrentMultipartUploads() throws Throwable {
     createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true);
 
     // Closing the first stream should throw RemoteFileChangedException
-    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close);
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    // or or the S3 Express equivalent.
+    expectPreconditionFailure(stream::close);
+  }
+
+  /**
+   * Expect an operation to fail with an S3 classic or S3 Express precondition failure.
+   * @param eval closure to eval
+   * @throws Exception any other failure.
+   */
+  private static void expectPreconditionFailure(final LambdaTestUtils.VoidCallable eval)
+      throws Exception {
+    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, eval);
+    S3Exception s3Exception = (S3Exception) exception.getCause();
+    if (!(s3Exception.statusCode() == SC_412_PRECONDITION_FAILED
+        || (s3Exception.statusCode() == SC_200_OK)
+        && PRECONDITION_FAILED.equals(s3Exception.awsErrorDetails().errorCode()))) {
+      throw exception;
+    }
   }
 
   @Test
@@ -390,7 +415,7 @@ public void testIfNoneMatchOverwriteWithEmptyFile() throws Throwable {
 
     // close the stream, should throw RemoteFileChangedException
     RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close);
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
   }
 
   @Test
@@ -407,7 +432,7 @@ public void testIfNoneMatchOverwriteEmptyFileWithFile() throws Throwable {
     // overwrite with non-empty file, should throw RemoteFileChangedException
     RemoteFileChangedException exception = intercept(RemoteFileChangedException.class,
             () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null));
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
   }
 
   @Test
@@ -425,7 +450,7 @@ public void testIfNoneMatchOverwriteEmptyWithEmptyFile() throws Throwable {
     FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, true, null);
     assertHasCapabilityConditionalCreate(stream2);
     RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close);
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
   }
 
   @Test
@@ -480,7 +505,7 @@ public void testIfMatchOverwriteWithOutdatedEtag() throws Throwable {
     // overwrite file with outdated etag. Should throw RemoteFileChangedException
     RemoteFileChangedException exception = intercept(RemoteFileChangedException.class,
             () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, etag));
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
   }
 
   @Test
@@ -504,7 +529,7 @@ public void testIfMatchOverwriteDeletedFileWithEtag() throws Throwable {
     // overwrite file with etag. Should throw FileNotFoundException
     FileNotFoundException exception = intercept(FileNotFoundException.class,
             () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, etag));
-    assertS3ExceptionStatusCode(SC_404_NOT_FOUND, exception);
+    verifyS3ExceptionStatusCode(SC_404_NOT_FOUND, exception);
   }
 
   @Test
@@ -553,8 +578,7 @@ public void testIfMatchTwoMultipartUploadsRaceConditionOneClosesFirst() throws T
     stream1.close();
 
     // Close second stream, should fail due to etag mismatch
-    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close);
-    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    expectPreconditionFailure(stream2::close);
   }
 
   @Disabled("conditional_write statistics not yet fully implemented")
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java
index 965512f..a2b016c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java
@@ -58,6 +58,7 @@
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads;
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile;
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.magicPath;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.toPathList;
 import static org.apache.hadoop.fs.s3a.S3AUtils.HIDDEN_FILE_FILTER;
@@ -106,6 +107,7 @@ public void setup() throws Exception {
     final S3AFileSystem fs = getFileSystem();
     final Path path = methodPath();
     assertHasPathCapabilities(fs, path, DIRECTORY_OPERATIONS_PURGE_UPLOADS);
+    assumeMultipartUploads(fs.getConf());
     listingInconsistent = fs.hasPathCapability(path, DIRECTORY_LISTING_INCONSISTENT);
     clearAnyUploads(fs, path);
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java
index 301f348..5c12dfe 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java
@@ -36,6 +36,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads;
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST;
 import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
@@ -65,6 +66,7 @@ public Configuration createConfiguration() {
   public void setup() throws Exception {
     super.setup();
     final S3AFileSystem fs = getFileSystem();
+    assumeMultipartUploads(fs.getConf());
     assertHasPathCapabilities(fs, new Path("/"),
         DIRECTORY_OPERATIONS_PURGE_UPLOADS);
     clearAnyUploads(fs, methodPath());
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java
index 43187e6..71d84f8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java
@@ -31,16 +31,39 @@
 public class TestChecksumSupport {
 
   @ParameterizedTest
-  @EnumSource(value = ChecksumAlgorithm.class, names = {"CRC32", "CRC32_C", "SHA1", "SHA256"})
+  @EnumSource(value = ChecksumAlgorithm.class,
+      names = {"CRC32", "CRC32_C", "SHA1", "SHA256", "CRC64_NVME"})
   public void testGetSupportedChecksumAlgorithm(ChecksumAlgorithm checksumAlgorithm) {
-    final Configuration conf = new Configuration();
-    conf.set(CHECKSUM_ALGORITHM, checksumAlgorithm.toString());
+    assertChecksumAlgorithm(checksumAlgorithm, checksumAlgorithm.toString());
+  }
+
+  /**
+   * Assert that a checksum algorithm string resolves to a value.
+   * @param checksumAlgorithm expected value
+   * @param algorithm algorithm name
+   */
+  private static void assertChecksumAlgorithm(final ChecksumAlgorithm checksumAlgorithm,
+      final String algorithm) {
+    final Configuration conf = new Configuration(false);
+    conf.set(CHECKSUM_ALGORITHM, algorithm);
     Assertions.assertThat(ChecksumSupport.getChecksumAlgorithm(conf))
         .describedAs("Checksum algorithm must match value set in the configuration")
         .isEqualTo(checksumAlgorithm);
   }
 
   @Test
+  public void testCRC32C() throws Throwable {
+    assertChecksumAlgorithm(ChecksumAlgorithm.CRC32_C, "CRC32C");
+    assertChecksumAlgorithm(ChecksumAlgorithm.CRC32_C, "CRC32_C");
+  }
+
+  @Test
+  public void testCRC64NVME() throws Throwable {
+    assertChecksumAlgorithm(ChecksumAlgorithm.CRC64_NVME, "CRC64_NVME");
+    assertChecksumAlgorithm(ChecksumAlgorithm.CRC64_NVME, "CRC64NVME");
+  }
+
+  @Test
   public void testGetChecksumAlgorithmWhenNull() {
     final Configuration conf = new Configuration();
     conf.unset(CHECKSUM_ALGORITHM);
@@ -57,4 +80,5 @@ public void testGetNotSupportedChecksumAlgorithm() {
         .describedAs("Invalid checksum algorithm should throw an exception")
         .isInstanceOf(IllegalArgumentException.class);
   }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java
index 9549a0c..29fa6d7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java
@@ -41,6 +41,7 @@
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.toChar;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_CREATE_ENABLED;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
 import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
@@ -201,9 +202,14 @@ public void testCreateBuilderSequence() throws Throwable {
           () -> buildFile(testFile, false, true,
               GET_FILE_STATUS_ON_FILE));
     } else {
-      // will trigger conditional create and throw RemoteFileChangedException
-      intercept(RemoteFileChangedException.class,
-              () -> buildFile(testFile, false, true, NO_HEAD_OR_LIST));
+      if (getFileSystem().getConf().getBoolean(FS_S3A_CONDITIONAL_CREATE_ENABLED, true)) {
+        // will trigger conditional create and throw RemoteFileChangedException
+        intercept(RemoteFileChangedException.class,
+            () -> buildFile(testFile, false, true, NO_HEAD_OR_LIST));
+      } else {
+        // third party store w/out conditional overwrite support
+        buildFile(testFile, false, true, NO_HEAD_OR_LIST);
+      }
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java
index 2727665..63f4ec7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java
@@ -97,14 +97,12 @@ public void testDeleteSingleFileInDir() throws Throwable {
             FILESTATUS_FILE_PROBE_L + FILESTATUS_DIR_PROBE_L),
         with(DIRECTORIES_DELETED, 0),
         with(FILES_DELETED, 1),
-
         // a single DELETE call is made to delete the object
-        probe(bulkDelete, OBJECT_DELETE_REQUEST, DELETE_OBJECT_REQUEST),
-        probe(!bulkDelete, OBJECT_DELETE_REQUEST,
-            DELETE_OBJECT_REQUEST + DELETE_MARKER_REQUEST),
+        with(OBJECT_DELETE_REQUEST, DELETE_OBJECT_REQUEST),
 
         // create no parent dirs or delete parents
         with(DIRECTORIES_CREATED, 0),
+        // even when bulk delete is enabled, there is no use of this.
         with(OBJECT_BULK_DELETE_REQUEST, 0)
     );
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java
index 995a20e..dad1c87 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java
@@ -44,6 +44,7 @@
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads;
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.countUploadsAt;
 import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createPartUpload;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.BucketInfo;
 import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.E_BAD_STATE;
@@ -129,6 +130,7 @@ public void testStoreInfoFips() throws Throwable {
 
   @Test
   public void testUploads() throws Throwable {
+    assumeMultipartUploads(getFileSystem().getConf());
     S3AFileSystem fs = getFileSystem();
     Path path = methodPath();
     Path file = new Path(path, UPLOAD_NAME);
@@ -173,14 +175,17 @@ public void testUploads() throws Throwable {
     }
   }
 
+
   @Test
   public void testUploadListByAge() throws Throwable {
     S3AFileSystem fs = getFileSystem();
     Path path = methodPath();
     Path file = new Path(path, UPLOAD_NAME);
+    assumeMultipartUploads(getFileSystem().getConf());
 
     describe("Cleaning up any leftover uploads from previous runs.");
 
+
     // 1. Make sure key doesn't already exist
     clearAnyUploads(fs, path);
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index 9c864d1..591aebf 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -108,6 +108,9 @@ public void setup() throws Exception {
     uploadBlockSize = uploadBlockSize();
     filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE,
         DEFAULT_HUGE_FILESIZE);
+    if (requireMultipartUploads()) {
+      assumeMultipartUploads(getFileSystem().getConf());
+    }
   }
 
   /**
@@ -130,6 +133,14 @@ public String getTestSuiteName() {
   }
 
   /**
+   * Override point: does this test suite require MPUs?
+   * @return true if the test suite must be skipped if MPUS are off.
+   */
+  protected boolean requireMultipartUploads() {
+    return false;
+  }
+
+  /**
    * Note that this can get called before test setup.
    * @return the configuration to use.
    */
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java
index c04261e..1bb99ee 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java
@@ -56,6 +56,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.RETRY_HTTP_5XX_ERRORS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyInt;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
@@ -84,6 +85,9 @@
  * <p>
  * Marked as a scale test even though it tries to aggressively abort streams being written
  * and should, if working, complete fast.
+ * <p>
+ * Assumes multipart uploads are enabled; single part upload interruptions aren't the complicated
+ * ones.
  */
 @ParameterizedClass(name = "{0}-{1}")
 @MethodSource("params")
@@ -171,6 +175,7 @@ protected Configuration createScaleConfiguration() {
   public void setup() throws Exception {
     SdkFaultInjector.resetFaultInjector();
     super.setup();
+    assumeMultipartUploads(getFileSystem().getConf());
   }
 
   @AfterEach
@@ -287,6 +292,7 @@ public void testAbortDuringUpload() throws Throwable {
   @Test
   public void testPartUploadFailure() throws Throwable {
     describe("Trigger a failure during a multipart upload");
+    assumeMultipartUploads(getFileSystem().getConf());
     int len = 6 * _1MB;
     final byte[] dataset = dataset(len, 'a', 'z' - 'a');
     final String text = "Simulated failure";
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java
index 33dfdc6..4b8322b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java
@@ -30,4 +30,13 @@ public class ITestS3AHugeFilesArrayBlocks extends AbstractSTestS3AHugeFiles {
   protected String getBlockOutputBufferName() {
     return Constants.FAST_UPLOAD_BUFFER_ARRAY;
   }
+
+  /**
+   * Skip this test suite when MPUS are not avaialable.
+   * @return false
+   */
+  @Override
+  protected boolean requireMultipartUploads() {
+    return true;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java
index 3b7b4ca..ff660bf3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java
@@ -43,6 +43,15 @@ protected String getBlockOutputBufferName() {
   }
 
   /**
+   * Skip this test suite when MPUS are not avaialable.
+   * @return false
+   */
+  @Override
+  protected boolean requireMultipartUploads() {
+    return true;
+  }
+
+  /**
    * Rename the parent directory, rather than the file itself.
    * @param src source file
    * @param dest dest file
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java
index cee917d..a00126e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java
@@ -21,6 +21,7 @@
 import java.io.File;
 
 import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import org.apache.commons.io.FileUtils;
@@ -44,6 +45,7 @@
 import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_ABORT;
 import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
@@ -76,6 +78,13 @@ protected Configuration createScaleConfiguration() {
     return configuration;
   }
 
+  @Override
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    assumeMultipartUploads(getFileSystem().getConf());
+  }
+
   /**
    * Uploads under the limit are valid.
    */
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java
index 7e6b599..2e2c8a2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java
@@ -19,12 +19,17 @@
 package org.apache.hadoop.fs.s3a.statistics;
 
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
 
-import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
+import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
 import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_REQUEST;
 
 /**
@@ -32,19 +37,27 @@
  */
 public class ITestAWSStatisticCollection extends AbstractS3ACostTest {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAWSStatisticCollection.class);
+
   @Override
   public Configuration createConfiguration() {
     final Configuration conf = super.createConfiguration();
-    conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, true);
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+        S3EXPRESS_CREATE_SESSION);
+    setPerformanceFlags(conf, "create");
+    conf.setBoolean(S3EXPRESS_CREATE_SESSION, false);
     return conf;
   }
 
   @Test
   public void testSDKMetricsCostOfGetFileStatusOnFile() throws Throwable {
-    describe("performing getFileStatus on a file");
+    describe("Performing getFileStatus() on a file");
     Path simpleFile = file(methodPath());
     // and repeat on the file looking at AWS wired up stats
-    verifyMetrics(() -> getFileSystem().getFileStatus(simpleFile),
+    final S3AFileSystem fs = getFileSystem();
+    LOG.info("Initiating GET request for {}", simpleFile);
+    verifyMetrics(() -> fs.getFileStatus(simpleFile),
         with(STORE_IO_REQUEST, 1));
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java
index c74faa2..9c8773b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java
@@ -46,6 +46,7 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS;
@@ -74,6 +75,7 @@ public void setup() throws Exception {
     super.setup();
     S3AFileSystem fs = getFileSystem();
     Configuration conf = getConfiguration();
+    assumeMultipartUploads(fs.getConf());
     rootPath = path("MiniClusterWordCount");
     Path workingDir = path("working");
     fs.setWorkingDirectory(workingDir);
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
index f616686..fe65174 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
+++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
@@ -68,8 +68,13 @@
 # include sensitive information such as account IDs in HTTP headers.
 # log4j.logger.software.amazon.awssdk.request=DEBUG
 
+# Log TLS info
+#log4j.logger.software.amazon.awssdk.thirdparty.org.apache.http.conn.ssl.SSLConnectionSocketFactory=DEBUG
+
+
 # Turn on low level HTTP protocol debugging
-#log4j.logger.org.apache.http.wire=DEBUG
+#log4j.logger.software.amazon.awssdk.thirdparty.org.apache.http.wire=DEBUG
+#log4j.logger.software.amazon.awssdk.thirdparty.org.apache.http=DEBUG
 
 # async client
 #log4j.logger.io.netty.handler.logging=DEBUG