blob: 3d7cdfc08dec4d6017877daef4802003e5088032 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.util.concurrent.atomic.AtomicLong;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.transfer.model.CopyResult;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.test.HadoopTestBase;
import static org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.CHANGE_DETECTED;
import static org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.createPolicy;
import static org.apache.hadoop.fs.s3a.impl.ChangeTracker.CHANGE_REPORTED_BY_S3;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test {@link ChangeTracker}.
*/
public class TestStreamChangeTracker extends HadoopTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestStreamChangeTracker.class);
public static final String BUCKET = "bucket";
public static final String OBJECT = "object";
public static final String DEST_OBJECT = "new_object";
public static final String URI = "s3a://" + BUCKET + "/" + OBJECT;
public static final Path PATH = new Path(URI);
@Test
public void testVersionCheckingHandlingNoVersions() throws Throwable {
LOG.info("If an endpoint doesn't return versions, that's OK");
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Client,
ChangeDetectionPolicy.Source.VersionId,
false);
assertFalse("Tracker should not have applied contraints " + tracker,
tracker.maybeApplyConstraint(newGetObjectRequest()));
tracker.processResponse(
newResponse(null, null),
"", 0);
assertTrackerMismatchCount(tracker, 0);
}
@Test
public void testVersionCheckingHandlingNoVersionsVersionRequired()
throws Throwable {
LOG.info("If an endpoint doesn't return versions but we are configured to"
+ "require them");
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Client,
ChangeDetectionPolicy.Source.VersionId,
true);
expectNoVersionAttributeException(tracker, newResponse(null, null),
"policy requires VersionId");
}
@Test
public void testEtagCheckingWarn() throws Throwable {
LOG.info("If an endpoint doesn't return errors, that's OK");
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Warn,
ChangeDetectionPolicy.Source.ETag,
false);
assertFalse("Tracker should not have applied constraints " + tracker,
tracker.maybeApplyConstraint(newGetObjectRequest()));
tracker.processResponse(
newResponse("e1", null),
"", 0);
tracker.processResponse(
newResponse("e1", null),
"", 0);
tracker.processResponse(
newResponse("e2", null),
"", 0);
assertTrackerMismatchCount(tracker, 1);
// subsequent error triggers doesn't trigger another warning
tracker.processResponse(
newResponse("e2", null),
"", 0);
assertTrackerMismatchCount(tracker, 1);
}
@Test
public void testVersionCheckingOnClient() throws Throwable {
LOG.info("Verify the client-side version checker raises exceptions");
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Client,
ChangeDetectionPolicy.Source.VersionId,
false);
assertFalse("Tracker should not have applied constraints " + tracker,
tracker.maybeApplyConstraint(newGetObjectRequest()));
tracker.processResponse(
newResponse(null, "rev1"),
"", 0);
assertTrackerMismatchCount(tracker, 0);
assertRevisionId(tracker, "rev1");
GetObjectRequest request = newGetObjectRequest();
expectChangeException(tracker,
newResponse(null, "rev2"), "change detected");
// mismatch was noted (so gets to FS stats)
assertTrackerMismatchCount(tracker, 1);
// another read causes another exception
expectChangeException(tracker,
newResponse(null, "rev2"), "change detected");
// mismatch was noted again
assertTrackerMismatchCount(tracker, 2);
}
@Test
public void testVersionCheckingOnServer() throws Throwable {
LOG.info("Verify the client-side version checker handles null-ness");
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Server,
ChangeDetectionPolicy.Source.VersionId,
false);
assertFalse("Tracker should not have applied contraints " + tracker,
tracker.maybeApplyConstraint(newGetObjectRequest()));
tracker.processResponse(
newResponse(null, "rev1"),
"", 0);
assertTrackerMismatchCount(tracker, 0);
assertRevisionId(tracker, "rev1");
GetObjectRequest request = newGetObjectRequest();
assertConstraintApplied(tracker, request);
// now, the tracker expects a null response
expectChangeException(tracker, null, CHANGE_REPORTED_BY_S3);
assertTrackerMismatchCount(tracker, 1);
// now, imagine the server doesn't trigger a failure due to some
// bug in its logic
// we should still react to the reported value
expectChangeException(tracker,
newResponse(null, "rev2"),
CHANGE_DETECTED);
}
@Test
public void testVersionCheckingUpfrontETag() throws Throwable {
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Server,
ChangeDetectionPolicy.Source.ETag,
false,
objectAttributes("etag1", "versionid1"));
assertEquals("etag1", tracker.getRevisionId());
}
@Test
public void testVersionCheckingUpfrontVersionId() throws Throwable {
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Server,
ChangeDetectionPolicy.Source.VersionId,
false,
objectAttributes("etag1", "versionid1"));
assertEquals("versionid1", tracker.getRevisionId());
}
@Test
public void testVersionCheckingETagCopyServer() throws Throwable {
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Server,
ChangeDetectionPolicy.Source.VersionId,
false,
objectAttributes("etag1", "versionid1"));
assertConstraintApplied(tracker, newCopyObjectRequest());
}
@Test
public void testVersionCheckingETagCopyClient() throws Throwable {
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Client,
ChangeDetectionPolicy.Source.VersionId,
false,
objectAttributes("etag1", "versionid1"));
assertFalse("Tracker should not have applied contraints " + tracker,
tracker.maybeApplyConstraint(newCopyObjectRequest()));
}
@Test
public void testCopyVersionIdRequired() throws Throwable {
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Client,
ChangeDetectionPolicy.Source.VersionId,
true,
objectAttributes("etag1", "versionId"));
expectNoVersionAttributeException(tracker, newCopyResult("etag1",
null),
"policy requires VersionId");
}
@Test
public void testCopyETagRequired() throws Throwable {
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Client,
ChangeDetectionPolicy.Source.ETag,
true,
objectAttributes("etag1", "versionId"));
expectNoVersionAttributeException(tracker, newCopyResult(null,
"versionId"),
"policy requires ETag");
}
@Test
public void testCopyVersionMismatch() throws Throwable {
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Server,
ChangeDetectionPolicy.Source.ETag,
true,
objectAttributes("etag", "versionId"));
// 412 is translated to RemoteFileChangedException
// note: this scenario is never currently hit due to
// https://github.com/aws/aws-sdk-java/issues/1644
AmazonServiceException awsException =
new AmazonServiceException("aws exception");
awsException.setStatusCode(ChangeTracker.SC_PRECONDITION_FAILED);
expectChangeException(tracker, awsException, "copy",
RemoteFileChangedException.PRECONDITIONS_FAILED);
// processing another type of exception does nothing
tracker.processException(new SdkBaseException("foo"), "copy");
}
protected void assertConstraintApplied(final ChangeTracker tracker,
final GetObjectRequest request) {
assertTrue("Tracker should have applied contraints " + tracker,
tracker.maybeApplyConstraint(request));
}
protected void assertConstraintApplied(final ChangeTracker tracker,
final CopyObjectRequest request) throws PathIOException {
assertTrue("Tracker should have applied contraints " + tracker,
tracker.maybeApplyConstraint(request));
}
protected RemoteFileChangedException expectChangeException(
final ChangeTracker tracker,
final S3Object response,
final String message) throws Exception {
return expectException(tracker, response, message,
RemoteFileChangedException.class);
}
protected RemoteFileChangedException expectChangeException(
final ChangeTracker tracker,
final SdkBaseException exception,
final String operation,
final String message) throws Exception {
return expectException(tracker, exception, operation, message,
RemoteFileChangedException.class);
}
protected PathIOException expectNoVersionAttributeException(
final ChangeTracker tracker,
final S3Object response,
final String message) throws Exception {
return expectException(tracker, response, message,
NoVersionAttributeException.class);
}
protected PathIOException expectNoVersionAttributeException(
final ChangeTracker tracker,
final CopyResult response,
final String message) throws Exception {
return expectException(tracker, response, message,
NoVersionAttributeException.class);
}
protected <T extends Exception> T expectException(
final ChangeTracker tracker,
final S3Object response,
final String message,
final Class<T> clazz) throws Exception {
return intercept(
clazz,
message,
() -> {
tracker.processResponse(response, "", 0);
return tracker;
});
}
protected <T extends Exception> T expectException(
final ChangeTracker tracker,
final CopyResult response,
final String message,
final Class<T> clazz) throws Exception {
return intercept(
clazz,
message,
() -> {
tracker.processResponse(response);
return tracker;
});
}
protected <T extends Exception> T expectException(
final ChangeTracker tracker,
final SdkBaseException exception,
final String operation,
final String message,
final Class<T> clazz) throws Exception {
return intercept(
clazz,
message,
() -> {
tracker.processException(exception, operation);
return tracker;
});
}
protected void assertRevisionId(final ChangeTracker tracker,
final String revId) {
assertEquals("Wrong revision ID in " + tracker,
revId, tracker.getRevisionId());
}
protected void assertTrackerMismatchCount(
final ChangeTracker tracker,
final int expectedCount) {
assertEquals("counter in tracker " + tracker,
expectedCount, tracker.getVersionMismatches().get());
}
/**
* Create tracker.
* Contains standard assertions(s).
* @return the tracker.
*/
protected ChangeTracker newTracker(final ChangeDetectionPolicy.Mode mode,
final ChangeDetectionPolicy.Source source, boolean requireVersion) {
return newTracker(mode, source, requireVersion,
objectAttributes(null, null));
}
/**
* Create tracker.
* Contains standard assertions(s).
* @return the tracker.
*/
protected ChangeTracker newTracker(final ChangeDetectionPolicy.Mode mode,
final ChangeDetectionPolicy.Source source, boolean requireVersion,
S3ObjectAttributes objectAttributes) {
ChangeDetectionPolicy policy = createPolicy(
mode,
source,
requireVersion);
ChangeTracker tracker = new ChangeTracker(URI, policy,
new AtomicLong(0), objectAttributes);
if (objectAttributes.getVersionId() == null
&& objectAttributes.getETag() == null) {
assertFalse("Tracker should not have applied constraints " + tracker,
tracker.maybeApplyConstraint(newGetObjectRequest()));
}
return tracker;
}
private GetObjectRequest newGetObjectRequest() {
return new GetObjectRequest(BUCKET, OBJECT);
}
private CopyObjectRequest newCopyObjectRequest() {
return new CopyObjectRequest(BUCKET, OBJECT, BUCKET, DEST_OBJECT);
}
private CopyResult newCopyResult(String eTag, String versionId) {
CopyResult copyResult = new CopyResult();
copyResult.setSourceBucketName(BUCKET);
copyResult.setSourceKey(OBJECT);
copyResult.setDestinationBucketName(BUCKET);
copyResult.setDestinationKey(DEST_OBJECT);
copyResult.setETag(eTag);
copyResult.setVersionId(versionId);
return copyResult;
}
private S3Object newResponse(String etag, String versionId) {
ObjectMetadata md = new ObjectMetadata();
if (etag != null) {
md.setHeader(Headers.ETAG, etag);
}
if (versionId != null) {
md.setHeader(Headers.S3_VERSION_ID, versionId);
}
S3Object response = emptyResponse();
response.setObjectMetadata(md);
return response;
}
private S3Object emptyResponse() {
S3Object response = new S3Object();
response.setBucketName(BUCKET);
response.setKey(OBJECT);
return response;
}
private S3ObjectAttributes objectAttributes(
String etag, String versionId) {
return new S3ObjectAttributes(BUCKET,
PATH,
OBJECT,
null,
null,
etag,
versionId,
0);
}
}