blob: 3e7833a5f82177d91b2d20893d9d6357c748ba54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.fs.s3.common.writer;
import com.amazonaws.services.s3.model.PartETag;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* Tests for the {@link S3RecoverableSerializer}.
*/
public class S3RecoverableSerializerTest {
private final S3RecoverableSerializer serializer = S3RecoverableSerializer.INSTANCE;
private static final String TEST_OBJECT_NAME = "TEST-OBJECT";
private static final String TEST_UPLOAD_ID = "TEST-UPLOAD-ID";
private static final String INCOMPLETE_OBJECT_NAME = "TEST-INCOMPLETE-PART";
private static final String ETAG_PREFIX = "TEST-ETAG-";
@Test
public void serializeEmptyS3Recoverable() throws IOException {
S3Recoverable originalEmptyRecoverable = createTestS3Recoverable(false);
byte[] serializedRecoverable = serializer.serialize(originalEmptyRecoverable);
S3Recoverable copiedEmptyRecoverable = serializer.deserialize(1, serializedRecoverable);
assertThat(originalEmptyRecoverable, isEqualTo(copiedEmptyRecoverable));
}
@Test
public void serializeS3RecoverableWithoutIncompleteObject() throws IOException {
S3Recoverable originalNoIncompletePartRecoverable = createTestS3Recoverable(false, 1, 5, 9);
byte[] serializedRecoverable = serializer.serialize(originalNoIncompletePartRecoverable);
S3Recoverable copiedNoIncompletePartRecoverable = serializer.deserialize(1, serializedRecoverable);
assertThat(originalNoIncompletePartRecoverable, isEqualTo(copiedNoIncompletePartRecoverable));
}
@Test
public void serializeS3RecoverableOnlyWithIncompleteObject() throws IOException {
S3Recoverable originalOnlyIncompletePartRecoverable = createTestS3Recoverable(true);
byte[] serializedRecoverable = serializer.serialize(originalOnlyIncompletePartRecoverable);
S3Recoverable copiedOnlyIncompletePartRecoverable = serializer.deserialize(1, serializedRecoverable);
assertThat(originalOnlyIncompletePartRecoverable, isEqualTo(copiedOnlyIncompletePartRecoverable));
}
@Test
public void serializeS3RecoverableWithCompleteAndIncompleteParts() throws IOException {
S3Recoverable originalFullRecoverable = createTestS3Recoverable(true, 1, 5, 9);
byte[] serializedRecoverable = serializer.serialize(originalFullRecoverable);
S3Recoverable copiedFullRecoverable = serializer.deserialize(1, serializedRecoverable);
assertThat(originalFullRecoverable, isEqualTo(copiedFullRecoverable));
}
// --------------------------------- Matchers ---------------------------------
private static TypeSafeMatcher<S3Recoverable> isEqualTo(S3Recoverable expectedRecoverable) {
return new TypeSafeMatcher<S3Recoverable>() {
@Override
protected boolean matchesSafely(S3Recoverable actualRecoverable) {
return Objects.equals(expectedRecoverable.getObjectName(), actualRecoverable.getObjectName())
&& Objects.equals(expectedRecoverable.uploadId(), actualRecoverable.uploadId())
&& expectedRecoverable.numBytesInParts() == actualRecoverable.numBytesInParts()
&& Objects.equals(expectedRecoverable.incompleteObjectName(), actualRecoverable.incompleteObjectName())
&& expectedRecoverable.incompleteObjectLength() == actualRecoverable.incompleteObjectLength()
&& compareLists(expectedRecoverable.parts(), actualRecoverable.parts());
}
private boolean compareLists(final List<PartETag> first, final List<PartETag> second) {
return Arrays.equals(
first.stream().map(PartETag::getETag).toArray(),
second.stream().map(PartETag::getETag).toArray()
);
}
@Override
public void describeTo(Description description) {
description.appendText(expectedRecoverable + " with ignored LAST_PART_OBJECT_NAME.");
}
};
}
// --------------------------------- Test Utils ---------------------------------
private static S3Recoverable createTestS3Recoverable(boolean withIncompletePart, int... partNumbers) {
List<PartETag> etags = new ArrayList<>();
for (int i : partNumbers) {
etags.add(createEtag(i));
}
if (withIncompletePart) {
return new S3Recoverable(TEST_OBJECT_NAME, TEST_UPLOAD_ID, etags, 12345L, INCOMPLETE_OBJECT_NAME, 54321L);
} else {
return new S3Recoverable(TEST_OBJECT_NAME, TEST_UPLOAD_ID, etags, 12345L);
}
}
private static PartETag createEtag(int partNumber) {
return new PartETag(partNumber, ETAG_PREFIX + partNumber);
}
}